1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.pool;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.bootstrap.Bootstrap;
21 import io.netty.channel.Channel;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.Future;
24 import io.netty.util.concurrent.FutureListener;
25 import io.netty.util.concurrent.GlobalEventExecutor;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.internal.ObjectUtil;
28
29 import java.nio.channels.ClosedChannelException;
30 import java.util.ArrayDeque;
31 import java.util.Queue;
32 import java.util.concurrent.atomic.AtomicInteger;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37
38
39
40
41
42 public class FixedChannelPool extends SimpleChannelPool {
43
44 public enum AcquireTimeoutAction {
45
46
47
48 NEW,
49
50
51
52
53 FAIL
54 }
55
56 private final EventExecutor executor;
57 private final long acquireTimeoutNanos;
58 private final Runnable timeoutTask;
59
60
61
62 private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<AcquireTask>();
63 private final int maxConnections;
64 private final int maxPendingAcquires;
65 private final AtomicInteger acquiredChannelCount = new AtomicInteger();
66 private int pendingAcquireCount;
67 private boolean closed;
68
69
70
71
72
73
74
75
76
77 public FixedChannelPool(Bootstrap bootstrap,
78 ChannelPoolHandler handler, int maxConnections) {
79 this(bootstrap, handler, maxConnections, Integer.MAX_VALUE);
80 }
81
82
83
84
85
86
87
88
89
90
91
92
93 public FixedChannelPool(Bootstrap bootstrap,
94 ChannelPoolHandler handler, int maxConnections, int maxPendingAcquires) {
95 this(bootstrap, handler, ChannelHealthChecker.ACTIVE, null, -1, maxConnections, maxPendingAcquires);
96 }
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 public FixedChannelPool(Bootstrap bootstrap,
116 ChannelPoolHandler handler,
117 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
118 final long acquireTimeoutMillis,
119 int maxConnections, int maxPendingAcquires) {
120 this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires, true);
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 public FixedChannelPool(Bootstrap bootstrap,
143 ChannelPoolHandler handler,
144 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
145 final long acquireTimeoutMillis,
146 int maxConnections, int maxPendingAcquires, final boolean releaseHealthCheck) {
147 this(bootstrap, handler, healthCheck, action, acquireTimeoutMillis, maxConnections, maxPendingAcquires,
148 releaseHealthCheck, true);
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 public FixedChannelPool(Bootstrap bootstrap,
172 ChannelPoolHandler handler,
173 ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
174 final long acquireTimeoutMillis,
175 int maxConnections, int maxPendingAcquires,
176 boolean releaseHealthCheck, boolean lastRecentUsed) {
177 super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
178 checkPositive(maxConnections, "maxConnections");
179 checkPositive(maxPendingAcquires, "maxPendingAcquires");
180 if (action == null && acquireTimeoutMillis == -1) {
181 timeoutTask = null;
182 acquireTimeoutNanos = -1;
183 } else if (action == null && acquireTimeoutMillis != -1) {
184 throw new NullPointerException("action");
185 } else if (action != null && acquireTimeoutMillis < 0) {
186 throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
187 } else {
188 acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
189 switch (action) {
190 case FAIL:
191 timeoutTask = new TimeoutTask() {
192 @Override
193 public void onTimeout(AcquireTask task) {
194
195 task.promise.setFailure(new AcquireTimeoutException());
196 }
197 };
198 break;
199 case NEW:
200 timeoutTask = new TimeoutTask() {
201 @Override
202 public void onTimeout(AcquireTask task) {
203
204
205 task.acquired();
206
207 FixedChannelPool.super.acquire(task.promise);
208 }
209 };
210 break;
211 default:
212 throw new Error();
213 }
214 }
215 executor = bootstrap.config().group().next();
216 this.maxConnections = maxConnections;
217 this.maxPendingAcquires = maxPendingAcquires;
218 }
219
220
221 public int acquiredChannelCount() {
222 return acquiredChannelCount.get();
223 }
224
225 @Override
226 public Future<Channel> acquire(final Promise<Channel> promise) {
227 try {
228 if (executor.inEventLoop()) {
229 acquire0(promise);
230 } else {
231 executor.execute(new Runnable() {
232 @Override
233 public void run() {
234 acquire0(promise);
235 }
236 });
237 }
238 } catch (Throwable cause) {
239 promise.tryFailure(cause);
240 }
241 return promise;
242 }
243
244 private void acquire0(final Promise<Channel> promise) {
245 try {
246 assert executor.inEventLoop();
247
248 if (closed) {
249 promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
250 return;
251 }
252 if (acquiredChannelCount.get() < maxConnections) {
253 assert acquiredChannelCount.get() >= 0;
254
255
256
257 Promise<Channel> p = executor.newPromise();
258 AcquireListener l = new AcquireListener(promise);
259 l.acquired();
260 p.addListener(l);
261 super.acquire(p);
262 } else {
263 if (pendingAcquireCount >= maxPendingAcquires) {
264 tooManyOutstanding(promise);
265 } else {
266 AcquireTask task = new AcquireTask(promise);
267 if (pendingAcquireQueue.offer(task)) {
268 ++pendingAcquireCount;
269
270 if (timeoutTask != null) {
271 task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos,
272 TimeUnit.NANOSECONDS);
273 }
274 } else {
275 tooManyOutstanding(promise);
276 }
277 }
278
279 assert pendingAcquireCount > 0;
280 }
281 } catch (Throwable cause) {
282 promise.tryFailure(cause);
283 }
284 }
285
286 private void tooManyOutstanding(Promise<?> promise) {
287 promise.setFailure(new IllegalStateException("Too many outstanding acquire operations"));
288 }
289
290 @Override
291 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
292 ObjectUtil.checkNotNull(promise, "promise");
293 final Promise<Void> p = executor.newPromise();
294 super.release(channel, p.addListener(new FutureListener<Void>() {
295
296 @Override
297 public void operationComplete(Future<Void> future) {
298 try {
299 assert executor.inEventLoop();
300
301 if (closed) {
302
303 channel.close();
304 promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
305 return;
306 }
307
308 if (future.isSuccess()) {
309 decrementAndRunTaskQueue();
310 promise.setSuccess(null);
311 } else {
312 Throwable cause = future.cause();
313
314 if (!(cause instanceof IllegalArgumentException)) {
315 decrementAndRunTaskQueue();
316 }
317 promise.setFailure(future.cause());
318 }
319 } catch (Throwable cause) {
320 promise.tryFailure(cause);
321 }
322 }
323 }));
324 return promise;
325 }
326
327 private void decrementAndRunTaskQueue() {
328
329 int currentCount = acquiredChannelCount.decrementAndGet();
330 assert currentCount >= 0;
331
332
333
334
335
336 runTaskQueue();
337 }
338
339 private void runTaskQueue() {
340 while (acquiredChannelCount.get() < maxConnections) {
341 AcquireTask task = pendingAcquireQueue.poll();
342 if (task == null) {
343 break;
344 }
345
346
347 ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
348 if (timeoutFuture != null) {
349 timeoutFuture.cancel(false);
350 }
351
352 --pendingAcquireCount;
353 task.acquired();
354
355 super.acquire(task.promise);
356 }
357
358
359 assert pendingAcquireCount >= 0;
360 assert acquiredChannelCount.get() >= 0;
361 }
362
363
364 private final class AcquireTask extends AcquireListener {
365 final Promise<Channel> promise;
366 final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
367 ScheduledFuture<?> timeoutFuture;
368
369 AcquireTask(Promise<Channel> promise) {
370 super(promise);
371
372
373 this.promise = executor.<Channel>newPromise().addListener(this);
374 }
375 }
376
377 private abstract class TimeoutTask implements Runnable {
378 @Override
379 public final void run() {
380 assert executor.inEventLoop();
381 long nanoTime = System.nanoTime();
382 for (;;) {
383 AcquireTask task = pendingAcquireQueue.peek();
384
385
386
387
388 if (task == null || nanoTime - task.expireNanoTime < 0) {
389 break;
390 }
391 pendingAcquireQueue.remove();
392
393 --pendingAcquireCount;
394 onTimeout(task);
395 }
396 }
397
398 public abstract void onTimeout(AcquireTask task);
399 }
400
401 private class AcquireListener implements FutureListener<Channel> {
402 private final Promise<Channel> originalPromise;
403 protected boolean acquired;
404
405 AcquireListener(Promise<Channel> originalPromise) {
406 this.originalPromise = originalPromise;
407 }
408
409 @Override
410 public void operationComplete(Future<Channel> future) throws Exception {
411 try {
412 assert executor.inEventLoop();
413
414 if (closed) {
415 if (future.isSuccess()) {
416
417 future.getNow().close();
418 }
419 originalPromise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
420 return;
421 }
422
423 if (future.isSuccess()) {
424 originalPromise.setSuccess(future.getNow());
425 } else {
426 if (acquired) {
427 decrementAndRunTaskQueue();
428 } else {
429 runTaskQueue();
430 }
431
432 originalPromise.setFailure(future.cause());
433 }
434 } catch (Throwable cause) {
435 originalPromise.tryFailure(cause);
436 }
437 }
438
439 public void acquired() {
440 if (acquired) {
441 return;
442 }
443 acquiredChannelCount.incrementAndGet();
444 acquired = true;
445 }
446 }
447
448 @Override
449 public void close() {
450 try {
451 closeAsync().await();
452 } catch (InterruptedException e) {
453 Thread.currentThread().interrupt();
454 throw new RuntimeException(e);
455 }
456 }
457
458
459
460
461
462
463 @Override
464 public Future<Void> closeAsync() {
465 if (executor.inEventLoop()) {
466 return close0();
467 } else {
468 final Promise<Void> closeComplete = executor.newPromise();
469 executor.execute(new Runnable() {
470 @Override
471 public void run() {
472 close0().addListener(new FutureListener<Void>() {
473 @Override
474 public void operationComplete(Future<Void> f) throws Exception {
475 if (f.isSuccess()) {
476 closeComplete.setSuccess(null);
477 } else {
478 closeComplete.setFailure(f.cause());
479 }
480 }
481 });
482 }
483 });
484 return closeComplete;
485 }
486 }
487
488 private Future<Void> close0() {
489 assert executor.inEventLoop();
490
491 if (!closed) {
492 closed = true;
493 for (;;) {
494 AcquireTask task = pendingAcquireQueue.poll();
495 if (task == null) {
496 break;
497 }
498 ScheduledFuture<?> f = task.timeoutFuture;
499 if (f != null) {
500 f.cancel(false);
501 }
502 task.promise.setFailure(new ClosedChannelException());
503 }
504 acquiredChannelCount.set(0);
505 pendingAcquireCount = 0;
506
507
508
509 return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
510 @Override
511 public Void call() throws Exception {
512 FixedChannelPool.super.close();
513 return null;
514 }
515 });
516 }
517
518 return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
519 }
520
521 private static final class AcquireTimeoutException extends TimeoutException {
522
523 private AcquireTimeoutException() {
524 super("Acquire operation took longer then configured maximum time");
525 }
526
527
528 @Override
529 public Throwable fillInStackTrace() {
530 return this;
531 }
532 }
533 }