1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.ConnectTimeoutException;
29 import io.netty.channel.EventLoop;
30 import io.netty.util.ReferenceCountUtil;
31 import io.netty.util.ReferenceCounted;
32 import io.netty.util.internal.ThrowableUtil;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.nio.channels.CancelledKeyException;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.ConnectionPendingException;
41 import java.nio.channels.SelectableChannel;
42 import java.nio.channels.SelectionKey;
43 import java.util.concurrent.ScheduledFuture;
44 import java.util.concurrent.TimeUnit;
45
46
47
48
49 public abstract class AbstractNioChannel extends AbstractChannel {
50
51 private static final InternalLogger logger =
52 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
53
54 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
55 new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
56
57 private final SelectableChannel ch;
58 protected final int readInterestOp;
59 volatile SelectionKey selectionKey;
60 private volatile boolean inputShutdown;
61 private volatile boolean readPending;
62
63
64
65
66
67 private ChannelPromise connectPromise;
68 private ScheduledFuture<?> connectTimeoutFuture;
69 private SocketAddress requestedRemoteAddress;
70
71
72
73
74
75
76
77
78 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
79 super(parent);
80 this.ch = ch;
81 this.readInterestOp = readInterestOp;
82 try {
83 ch.configureBlocking(false);
84 } catch (IOException e) {
85 try {
86 ch.close();
87 } catch (IOException e2) {
88 if (logger.isWarnEnabled()) {
89 logger.warn(
90 "Failed to close a partially initialized socket.", e2);
91 }
92 }
93
94 throw new ChannelException("Failed to enter non-blocking mode.", e);
95 }
96 }
97
98 @Override
99 public boolean isOpen() {
100 return ch.isOpen();
101 }
102
103 @Override
104 public NioUnsafe unsafe() {
105 return (NioUnsafe) super.unsafe();
106 }
107
108 protected SelectableChannel javaChannel() {
109 return ch;
110 }
111
112 @Override
113 public NioEventLoop eventLoop() {
114 return (NioEventLoop) super.eventLoop();
115 }
116
117
118
119
120 protected SelectionKey selectionKey() {
121 assert selectionKey != null;
122 return selectionKey;
123 }
124
125 protected boolean isReadPending() {
126 return readPending;
127 }
128
129 protected void setReadPending(boolean readPending) {
130 this.readPending = readPending;
131 }
132
133
134
135
136 protected boolean isInputShutdown() {
137 return inputShutdown;
138 }
139
140
141
142
143 void setInputShutdown() {
144 inputShutdown = true;
145 }
146
147
148
149
150 public interface NioUnsafe extends Unsafe {
151
152
153
154 SelectableChannel ch();
155
156
157
158
159 void finishConnect();
160
161
162
163
164 void read();
165
166 void forceFlush();
167 }
168
169 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
170
171 protected final void removeReadOp() {
172 SelectionKey key = selectionKey();
173
174
175
176 if (!key.isValid()) {
177 return;
178 }
179 int interestOps = key.interestOps();
180 if ((interestOps & readInterestOp) != 0) {
181
182 key.interestOps(interestOps & ~readInterestOp);
183 }
184 }
185
186 @Override
187 public final SelectableChannel ch() {
188 return javaChannel();
189 }
190
191 @Override
192 public final void connect(
193 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
194 if (!promise.setUncancellable() || !ensureOpen(promise)) {
195 return;
196 }
197
198 try {
199 if (connectPromise != null) {
200
201 throw new ConnectionPendingException();
202 }
203
204 boolean wasActive = isActive();
205 if (doConnect(remoteAddress, localAddress)) {
206 fulfillConnectPromise(promise, wasActive);
207 } else {
208 connectPromise = promise;
209 requestedRemoteAddress = remoteAddress;
210
211
212 int connectTimeoutMillis = config().getConnectTimeoutMillis();
213 if (connectTimeoutMillis > 0) {
214 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
215 @Override
216 public void run() {
217 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
218 ConnectTimeoutException cause =
219 new ConnectTimeoutException("connection timed out: " + remoteAddress);
220 if (connectPromise != null && connectPromise.tryFailure(cause)) {
221 close(voidPromise());
222 }
223 }
224 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
225 }
226
227 promise.addListener(new ChannelFutureListener() {
228 @Override
229 public void operationComplete(ChannelFuture future) throws Exception {
230 if (future.isCancelled()) {
231 if (connectTimeoutFuture != null) {
232 connectTimeoutFuture.cancel(false);
233 }
234 connectPromise = null;
235 close(voidPromise());
236 }
237 }
238 });
239 }
240 } catch (Throwable t) {
241 promise.tryFailure(annotateConnectException(t, remoteAddress));
242 closeIfClosed();
243 }
244 }
245
246 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
247 if (promise == null) {
248
249 return;
250 }
251
252
253
254 boolean active = isActive();
255
256
257 boolean promiseSet = promise.trySuccess();
258
259
260
261 if (!wasActive && active) {
262 pipeline().fireChannelActive();
263 }
264
265
266 if (!promiseSet) {
267 close(voidPromise());
268 }
269 }
270
271 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
272 if (promise == null) {
273
274 return;
275 }
276
277
278 promise.tryFailure(cause);
279 closeIfClosed();
280 }
281
282 @Override
283 public final void finishConnect() {
284
285
286
287 assert eventLoop().inEventLoop();
288
289 try {
290 boolean wasActive = isActive();
291 doFinishConnect();
292 fulfillConnectPromise(connectPromise, wasActive);
293 } catch (Throwable t) {
294 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
295 } finally {
296
297
298 if (connectTimeoutFuture != null) {
299 connectTimeoutFuture.cancel(false);
300 }
301 connectPromise = null;
302 }
303 }
304
305 @Override
306 protected final void flush0() {
307
308
309
310 if (isFlushPending()) {
311 return;
312 }
313 super.flush0();
314 }
315
316 @Override
317 public final void forceFlush() {
318
319 super.flush0();
320 }
321
322 private boolean isFlushPending() {
323 SelectionKey selectionKey = selectionKey();
324 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
325 }
326 }
327
328 @Override
329 protected boolean isCompatible(EventLoop loop) {
330 return loop instanceof NioEventLoop;
331 }
332
333 @Override
334 protected void doRegister() throws Exception {
335 boolean selected = false;
336 for (;;) {
337 try {
338 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
339 return;
340 } catch (CancelledKeyException e) {
341 if (!selected) {
342
343
344 eventLoop().selectNow();
345 selected = true;
346 } else {
347
348
349 throw e;
350 }
351 }
352 }
353 }
354
355 @Override
356 protected void doDeregister() throws Exception {
357 eventLoop().cancel(selectionKey());
358 }
359
360 @Override
361 protected void doBeginRead() throws Exception {
362
363 if (inputShutdown) {
364 return;
365 }
366
367 final SelectionKey selectionKey = this.selectionKey;
368 if (!selectionKey.isValid()) {
369 return;
370 }
371
372 readPending = true;
373
374 final int interestOps = selectionKey.interestOps();
375 if ((interestOps & readInterestOp) == 0) {
376 selectionKey.interestOps(interestOps | readInterestOp);
377 }
378 }
379
380
381
382
383 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
384
385
386
387
388 protected abstract void doFinishConnect() throws Exception;
389
390
391
392
393
394
395 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
396 final int readableBytes = buf.readableBytes();
397 if (readableBytes == 0) {
398 ReferenceCountUtil.safeRelease(buf);
399 return Unpooled.EMPTY_BUFFER;
400 }
401
402 final ByteBufAllocator alloc = alloc();
403 if (alloc.isDirectBufferPooled()) {
404 ByteBuf directBuf = alloc.directBuffer(readableBytes);
405 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
406 ReferenceCountUtil.safeRelease(buf);
407 return directBuf;
408 }
409
410 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
411 if (directBuf != null) {
412 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
413 ReferenceCountUtil.safeRelease(buf);
414 return directBuf;
415 }
416
417
418 return buf;
419 }
420
421
422
423
424
425
426
427 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
428 final int readableBytes = buf.readableBytes();
429 if (readableBytes == 0) {
430 ReferenceCountUtil.safeRelease(holder);
431 return Unpooled.EMPTY_BUFFER;
432 }
433
434 final ByteBufAllocator alloc = alloc();
435 if (alloc.isDirectBufferPooled()) {
436 ByteBuf directBuf = alloc.directBuffer(readableBytes);
437 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
438 ReferenceCountUtil.safeRelease(holder);
439 return directBuf;
440 }
441
442 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
443 if (directBuf != null) {
444 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
445 ReferenceCountUtil.safeRelease(holder);
446 return directBuf;
447 }
448
449
450 if (holder != buf) {
451
452 buf.retain();
453 ReferenceCountUtil.safeRelease(holder);
454 }
455
456 return buf;
457 }
458
459 @Override
460 protected void doClose() throws Exception {
461 ChannelPromise promise = connectPromise;
462 if (promise != null) {
463
464 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
465 connectPromise = null;
466 }
467
468 ScheduledFuture<?> future = connectTimeoutFuture;
469 if (future != null) {
470 future.cancel(false);
471 connectTimeoutFuture = null;
472 }
473 }
474 }