1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.pool;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelFutureListener;
22 import io.netty.channel.ChannelInitializer;
23 import io.netty.channel.EventLoop;
24 import io.netty.util.AttributeKey;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.FutureListener;
27 import io.netty.util.concurrent.GlobalEventExecutor;
28 import io.netty.util.concurrent.Promise;
29 import io.netty.util.internal.PlatformDependent;
30
31 import java.util.Deque;
32 import java.util.concurrent.Callable;
33
34 import static io.netty.util.internal.ObjectUtil.*;
35
36
37
38
39
40
41
42
43 public class SimpleChannelPool implements ChannelPool {
44 private static final AttributeKey<SimpleChannelPool> POOL_KEY =
45 AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
46 private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
47 private final ChannelPoolHandler handler;
48 private final ChannelHealthChecker healthCheck;
49 private final Bootstrap bootstrap;
50 private final boolean releaseHealthCheck;
51 private final boolean lastRecentUsed;
52
53
54
55
56
57
58
59 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler) {
60 this(bootstrap, handler, ChannelHealthChecker.ACTIVE);
61 }
62
63
64
65
66
67
68
69
70
71 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
72 this(bootstrap, handler, healthCheck, true);
73 }
74
75
76
77
78
79
80
81
82
83
84
85 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
86 boolean releaseHealthCheck) {
87 this(bootstrap, handler, healthCheck, releaseHealthCheck, true);
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101 public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
102 boolean releaseHealthCheck, boolean lastRecentUsed) {
103 this.handler = checkNotNull(handler, "handler");
104 this.healthCheck = checkNotNull(healthCheck, "healthCheck");
105 this.releaseHealthCheck = releaseHealthCheck;
106
107 this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
108 this.bootstrap.handler(new ChannelInitializer<Channel>() {
109 @Override
110 protected void initChannel(Channel ch) throws Exception {
111 assert ch.eventLoop().inEventLoop();
112 handler.channelCreated(ch);
113 }
114 });
115 this.lastRecentUsed = lastRecentUsed;
116 }
117
118
119
120
121
122
123 protected Bootstrap bootstrap() {
124 return bootstrap;
125 }
126
127
128
129
130
131
132 protected ChannelPoolHandler handler() {
133 return handler;
134 }
135
136
137
138
139
140
141 protected ChannelHealthChecker healthChecker() {
142 return healthCheck;
143 }
144
145
146
147
148
149
150
151 protected boolean releaseHealthCheck() {
152 return releaseHealthCheck;
153 }
154
155 @Override
156 public final Future<Channel> acquire() {
157 return acquire(bootstrap.config().group().next().<Channel>newPromise());
158 }
159
160 @Override
161 public Future<Channel> acquire(final Promise<Channel> promise) {
162 return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
163 }
164
165
166
167
168
169
170 private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
171 try {
172 final Channel ch = pollChannel();
173 if (ch == null) {
174
175 Bootstrap bs = bootstrap.clone();
176 bs.attr(POOL_KEY, this);
177 ChannelFuture f = connectChannel(bs);
178 if (f.isDone()) {
179 notifyConnect(f, promise);
180 } else {
181 f.addListener(new ChannelFutureListener() {
182 @Override
183 public void operationComplete(ChannelFuture future) throws Exception {
184 notifyConnect(future, promise);
185 }
186 });
187 }
188 } else {
189 EventLoop loop = ch.eventLoop();
190 if (loop.inEventLoop()) {
191 doHealthCheck(ch, promise);
192 } else {
193 loop.execute(new Runnable() {
194 @Override
195 public void run() {
196 doHealthCheck(ch, promise);
197 }
198 });
199 }
200 }
201 } catch (Throwable cause) {
202 promise.tryFailure(cause);
203 }
204 return promise;
205 }
206
207 private void notifyConnect(ChannelFuture future, Promise<Channel> promise) {
208 Channel channel = null;
209 try {
210 if (future.isSuccess()) {
211 channel = future.channel();
212 handler.channelAcquired(channel);
213 if (!promise.trySuccess(channel)) {
214
215 release(channel);
216 }
217 } else {
218 promise.tryFailure(future.cause());
219 }
220 } catch (Throwable cause) {
221 closeAndFail(channel, cause, promise);
222 }
223 }
224
225 private void doHealthCheck(final Channel channel, final Promise<Channel> promise) {
226 try {
227 assert channel.eventLoop().inEventLoop();
228 Future<Boolean> f = healthCheck.isHealthy(channel);
229 if (f.isDone()) {
230 notifyHealthCheck(f, channel, promise);
231 } else {
232 f.addListener(new FutureListener<Boolean>() {
233 @Override
234 public void operationComplete(Future<Boolean> future) {
235 notifyHealthCheck(future, channel, promise);
236 }
237 });
238 }
239 } catch (Throwable cause) {
240 closeAndFail(channel, cause, promise);
241 }
242 }
243
244 private void notifyHealthCheck(Future<Boolean> future, Channel channel, Promise<Channel> promise) {
245 try {
246 assert channel.eventLoop().inEventLoop();
247 if (future.isSuccess() && future.getNow()) {
248 channel.attr(POOL_KEY).set(this);
249 handler.channelAcquired(channel);
250 promise.setSuccess(channel);
251 } else {
252 closeChannel(channel);
253 acquireHealthyFromPoolOrNew(promise);
254 }
255 } catch (Throwable cause) {
256 closeAndFail(channel, cause, promise);
257 }
258 }
259
260
261
262
263
264
265
266 protected ChannelFuture connectChannel(Bootstrap bs) {
267 return bs.connect();
268 }
269
270 @Override
271 public final Future<Void> release(Channel channel) {
272 return release(channel, channel.eventLoop().<Void>newPromise());
273 }
274
275 @Override
276 public Future<Void> release(final Channel channel, final Promise<Void> promise) {
277 try {
278 checkNotNull(channel, "channel");
279 checkNotNull(promise, "promise");
280 EventLoop loop = channel.eventLoop();
281 if (loop.inEventLoop()) {
282 doReleaseChannel(channel, promise);
283 } else {
284 loop.execute(new Runnable() {
285 @Override
286 public void run() {
287 doReleaseChannel(channel, promise);
288 }
289 });
290 }
291 } catch (Throwable cause) {
292 closeAndFail(channel, cause, promise);
293 }
294 return promise;
295 }
296
297 private void doReleaseChannel(Channel channel, Promise<Void> promise) {
298 try {
299 assert channel.eventLoop().inEventLoop();
300
301 if (channel.attr(POOL_KEY).getAndSet(null) != this) {
302 closeAndFail(channel,
303
304 new IllegalArgumentException(
305 "Channel " + channel + " was not acquired from this ChannelPool"),
306 promise);
307 } else {
308 if (releaseHealthCheck) {
309 doHealthCheckOnRelease(channel, promise);
310 } else {
311 releaseAndOffer(channel, promise);
312 }
313 }
314 } catch (Throwable cause) {
315 closeAndFail(channel, cause, promise);
316 }
317 }
318
319 private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
320 final Future<Boolean> f = healthCheck.isHealthy(channel);
321 if (f.isDone()) {
322 releaseAndOfferIfHealthy(channel, promise, f);
323 } else {
324 f.addListener(new FutureListener<Boolean>() {
325 @Override
326 public void operationComplete(Future<Boolean> future) throws Exception {
327 releaseAndOfferIfHealthy(channel, promise, f);
328 }
329 });
330 }
331 }
332
333
334
335
336
337
338
339
340 private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) {
341 try {
342 if (future.getNow()) {
343 releaseAndOffer(channel, promise);
344 } else {
345 handler.channelReleased(channel);
346 promise.setSuccess(null);
347 }
348 } catch (Throwable cause) {
349 closeAndFail(channel, cause, promise);
350 }
351 }
352
353 private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
354 if (offerChannel(channel)) {
355 handler.channelReleased(channel);
356 promise.setSuccess(null);
357 } else {
358 closeAndFail(channel, new ChannelPoolFullException(), promise);
359 }
360 }
361
362 private void closeChannel(Channel channel) throws Exception {
363 channel.attr(POOL_KEY).getAndSet(null);
364 channel.close();
365 }
366
367 private void closeAndFail(Channel channel, Throwable cause, Promise<?> promise) {
368 if (channel != null) {
369 try {
370 closeChannel(channel);
371 } catch (Throwable t) {
372 promise.tryFailure(t);
373 }
374 }
375 promise.tryFailure(cause);
376 }
377
378
379
380
381
382
383
384
385 protected Channel pollChannel() {
386 return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
387 }
388
389
390
391
392
393
394
395
396 protected boolean offerChannel(Channel channel) {
397 return deque.offer(channel);
398 }
399
400 @Override
401 public void close() {
402 for (;;) {
403 Channel channel = pollChannel();
404 if (channel == null) {
405 break;
406 }
407
408 channel.close().awaitUninterruptibly();
409 }
410 }
411
412
413
414
415
416
417 public Future<Void> closeAsync() {
418
419 return GlobalEventExecutor.INSTANCE.submit(new Callable<Void>() {
420 @Override
421 public Void call() throws Exception {
422 close();
423 return null;
424 }
425 });
426 }
427
428 private static final class ChannelPoolFullException extends IllegalStateException {
429
430 private ChannelPoolFullException() {
431 super("ChannelPool full");
432 }
433
434
435 @Override
436 public Throwable fillInStackTrace() {
437 return this;
438 }
439 }
440 }