1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelOption;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.EventLoop;
27 import io.netty.channel.FileRegion;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.nio.AbstractNioByteChannel;
30 import io.netty.channel.socket.DefaultSocketChannelConfig;
31 import io.netty.channel.socket.ServerSocketChannel;
32 import io.netty.channel.socket.SocketChannelConfig;
33 import io.netty.util.concurrent.GlobalEventExecutor;
34 import io.netty.util.internal.PlatformDependent;
35 import io.netty.util.internal.SocketUtils;
36 import io.netty.util.internal.SuppressJava6Requirement;
37 import io.netty.util.internal.UnstableApi;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.net.InetSocketAddress;
43 import java.net.Socket;
44 import java.net.SocketAddress;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.SelectionKey;
47 import java.nio.channels.SocketChannel;
48 import java.nio.channels.spi.SelectorProvider;
49 import java.util.Map;
50 import java.util.concurrent.Executor;
51
52 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53
54
55
56
57 public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
58 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
59 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
60
61 private static SocketChannel newSocket(SelectorProvider provider) {
62 try {
63
64
65
66
67
68
69 return provider.openSocketChannel();
70 } catch (IOException e) {
71 throw new ChannelException("Failed to open a socket.", e);
72 }
73 }
74
75 private final SocketChannelConfig config;
76
77
78
79
80 public NioSocketChannel() {
81 this(DEFAULT_SELECTOR_PROVIDER);
82 }
83
84
85
86
87 public NioSocketChannel(SelectorProvider provider) {
88 this(newSocket(provider));
89 }
90
91
92
93
94 public NioSocketChannel(SocketChannel socket) {
95 this(null, socket);
96 }
97
98
99
100
101
102
103
104 public NioSocketChannel(Channel parent, SocketChannel socket) {
105 super(parent, socket);
106 config = new NioSocketChannelConfig(this, socket.socket());
107 }
108
109 @Override
110 public ServerSocketChannel parent() {
111 return (ServerSocketChannel) super.parent();
112 }
113
114 @Override
115 public SocketChannelConfig config() {
116 return config;
117 }
118
119 @Override
120 protected SocketChannel javaChannel() {
121 return (SocketChannel) super.javaChannel();
122 }
123
124 @Override
125 public boolean isActive() {
126 SocketChannel ch = javaChannel();
127 return ch.isOpen() && ch.isConnected();
128 }
129
130 @Override
131 public boolean isOutputShutdown() {
132 return javaChannel().socket().isOutputShutdown() || !isActive();
133 }
134
135 @Override
136 public boolean isInputShutdown() {
137 return javaChannel().socket().isInputShutdown() || !isActive();
138 }
139
140 @Override
141 public boolean isShutdown() {
142 Socket socket = javaChannel().socket();
143 return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
144 }
145
146 @Override
147 public InetSocketAddress localAddress() {
148 return (InetSocketAddress) super.localAddress();
149 }
150
151 @Override
152 public InetSocketAddress remoteAddress() {
153 return (InetSocketAddress) super.remoteAddress();
154 }
155
156 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
157 @UnstableApi
158 @Override
159 protected final void doShutdownOutput() throws Exception {
160 if (PlatformDependent.javaVersion() >= 7) {
161 javaChannel().shutdownOutput();
162 } else {
163 javaChannel().socket().shutdownOutput();
164 }
165 }
166
167 @Override
168 public ChannelFuture shutdownOutput() {
169 return shutdownOutput(newPromise());
170 }
171
172 @Override
173 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
174 final EventLoop loop = eventLoop();
175 if (loop.inEventLoop()) {
176 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
177 } else {
178 loop.execute(new Runnable() {
179 @Override
180 public void run() {
181 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
182 }
183 });
184 }
185 return promise;
186 }
187
188 @Override
189 public ChannelFuture shutdownInput() {
190 return shutdownInput(newPromise());
191 }
192
193 @Override
194 protected boolean isInputShutdown0() {
195 return isInputShutdown();
196 }
197
198 @Override
199 public ChannelFuture shutdownInput(final ChannelPromise promise) {
200 EventLoop loop = eventLoop();
201 if (loop.inEventLoop()) {
202 shutdownInput0(promise);
203 } else {
204 loop.execute(new Runnable() {
205 @Override
206 public void run() {
207 shutdownInput0(promise);
208 }
209 });
210 }
211 return promise;
212 }
213
214 @Override
215 public ChannelFuture shutdown() {
216 return shutdown(newPromise());
217 }
218
219 @Override
220 public ChannelFuture shutdown(final ChannelPromise promise) {
221 ChannelFuture shutdownOutputFuture = shutdownOutput();
222 if (shutdownOutputFuture.isDone()) {
223 shutdownOutputDone(shutdownOutputFuture, promise);
224 } else {
225 shutdownOutputFuture.addListener(new ChannelFutureListener() {
226 @Override
227 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
228 shutdownOutputDone(shutdownOutputFuture, promise);
229 }
230 });
231 }
232 return promise;
233 }
234
235 private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
236 ChannelFuture shutdownInputFuture = shutdownInput();
237 if (shutdownInputFuture.isDone()) {
238 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
239 } else {
240 shutdownInputFuture.addListener(new ChannelFutureListener() {
241 @Override
242 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
243 shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
244 }
245 });
246 }
247 }
248
249 private static void shutdownDone(ChannelFuture shutdownOutputFuture,
250 ChannelFuture shutdownInputFuture,
251 ChannelPromise promise) {
252 Throwable shutdownOutputCause = shutdownOutputFuture.cause();
253 Throwable shutdownInputCause = shutdownInputFuture.cause();
254 if (shutdownOutputCause != null) {
255 if (shutdownInputCause != null) {
256 logger.debug("Exception suppressed because a previous exception occurred.",
257 shutdownInputCause);
258 }
259 promise.setFailure(shutdownOutputCause);
260 } else if (shutdownInputCause != null) {
261 promise.setFailure(shutdownInputCause);
262 } else {
263 promise.setSuccess();
264 }
265 }
266 private void shutdownInput0(final ChannelPromise promise) {
267 try {
268 shutdownInput0();
269 promise.setSuccess();
270 } catch (Throwable t) {
271 promise.setFailure(t);
272 }
273 }
274
275 @SuppressJava6Requirement(reason = "Usage guarded by java version check")
276 private void shutdownInput0() throws Exception {
277 if (PlatformDependent.javaVersion() >= 7) {
278 javaChannel().shutdownInput();
279 } else {
280 javaChannel().socket().shutdownInput();
281 }
282 }
283
284 @Override
285 protected SocketAddress localAddress0() {
286 return javaChannel().socket().getLocalSocketAddress();
287 }
288
289 @Override
290 protected SocketAddress remoteAddress0() {
291 return javaChannel().socket().getRemoteSocketAddress();
292 }
293
294 @Override
295 protected void doBind(SocketAddress localAddress) throws Exception {
296 doBind0(localAddress);
297 }
298
299 private void doBind0(SocketAddress localAddress) throws Exception {
300 if (PlatformDependent.javaVersion() >= 7) {
301 SocketUtils.bind(javaChannel(), localAddress);
302 } else {
303 SocketUtils.bind(javaChannel().socket(), localAddress);
304 }
305 }
306
307 @Override
308 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
309 if (localAddress != null) {
310 doBind0(localAddress);
311 }
312
313 boolean success = false;
314 try {
315 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
316 if (!connected) {
317 selectionKey().interestOps(SelectionKey.OP_CONNECT);
318 }
319 success = true;
320 return connected;
321 } finally {
322 if (!success) {
323 doClose();
324 }
325 }
326 }
327
328 @Override
329 protected void doFinishConnect() throws Exception {
330 if (!javaChannel().finishConnect()) {
331 throw new Error();
332 }
333 }
334
335 @Override
336 protected void doDisconnect() throws Exception {
337 doClose();
338 }
339
340 @Override
341 protected void doClose() throws Exception {
342 super.doClose();
343 javaChannel().close();
344 }
345
346 @Override
347 protected int doReadBytes(ByteBuf byteBuf) throws Exception {
348 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
349 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
350 return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
351 }
352
353 @Override
354 protected int doWriteBytes(ByteBuf buf) throws Exception {
355 final int expectedWrittenBytes = buf.readableBytes();
356 return buf.readBytes(javaChannel(), expectedWrittenBytes);
357 }
358
359 @Override
360 protected long doWriteFileRegion(FileRegion region) throws Exception {
361 final long position = region.transferred();
362 return region.transferTo(javaChannel(), position);
363 }
364
365 private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
366
367
368
369 if (attempted == written) {
370 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
371 ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
372 }
373 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
374 ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
375 }
376 }
377
378 @Override
379 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
380 SocketChannel ch = javaChannel();
381 int writeSpinCount = config().getWriteSpinCount();
382 do {
383 if (in.isEmpty()) {
384
385 clearOpWrite();
386
387 return;
388 }
389
390
391 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
392 ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
393 int nioBufferCnt = in.nioBufferCount();
394
395
396
397 switch (nioBufferCnt) {
398 case 0:
399
400 writeSpinCount -= doWrite0(in);
401 break;
402 case 1: {
403
404
405
406 ByteBuffer buffer = nioBuffers[0];
407 int attemptedBytes = buffer.remaining();
408 final int localWrittenBytes = ch.write(buffer);
409 if (localWrittenBytes <= 0) {
410 incompleteWrite(true);
411 return;
412 }
413 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
414 in.removeBytes(localWrittenBytes);
415 --writeSpinCount;
416 break;
417 }
418 default: {
419
420
421
422 long attemptedBytes = in.nioBufferSize();
423 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
424 if (localWrittenBytes <= 0) {
425 incompleteWrite(true);
426 return;
427 }
428
429 adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
430 maxBytesPerGatheringWrite);
431 in.removeBytes(localWrittenBytes);
432 --writeSpinCount;
433 break;
434 }
435 }
436 } while (writeSpinCount > 0);
437
438 incompleteWrite(writeSpinCount < 0);
439 }
440
441 @Override
442 protected AbstractNioUnsafe newUnsafe() {
443 return new NioSocketChannelUnsafe();
444 }
445
446 private final class NioSocketChannelUnsafe extends NioByteUnsafe {
447 @Override
448 protected Executor prepareToClose() {
449 try {
450 if (javaChannel().isOpen() && config().getSoLinger() > 0) {
451
452
453
454
455 doDeregister();
456 return GlobalEventExecutor.INSTANCE;
457 }
458 } catch (Throwable ignore) {
459
460
461
462 }
463 return null;
464 }
465 }
466
467 private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
468 private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
469 private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
470 super(channel, javaSocket);
471 calculateMaxBytesPerGatheringWrite();
472 }
473
474 @Override
475 protected void autoReadCleared() {
476 clearReadPending();
477 }
478
479 @Override
480 public NioSocketChannelConfig setSendBufferSize(int sendBufferSize) {
481 super.setSendBufferSize(sendBufferSize);
482 calculateMaxBytesPerGatheringWrite();
483 return this;
484 }
485
486 @Override
487 public <T> boolean setOption(ChannelOption<T> option, T value) {
488 if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
489 return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
490 }
491 return super.setOption(option, value);
492 }
493
494 @Override
495 public <T> T getOption(ChannelOption<T> option) {
496 if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
497 return NioChannelOption.getOption(jdkChannel(), (NioChannelOption<T>) option);
498 }
499 return super.getOption(option);
500 }
501
502 @Override
503 public Map<ChannelOption<?>, Object> getOptions() {
504 if (PlatformDependent.javaVersion() >= 7) {
505 return getOptions(super.getOptions(), NioChannelOption.getOptions(jdkChannel()));
506 }
507 return super.getOptions();
508 }
509
510 void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) {
511 this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
512 }
513
514 int getMaxBytesPerGatheringWrite() {
515 return maxBytesPerGatheringWrite;
516 }
517
518 private void calculateMaxBytesPerGatheringWrite() {
519
520 int newSendBufferSize = getSendBufferSize() << 1;
521 if (newSendBufferSize > 0) {
522 setMaxBytesPerGatheringWrite(newSendBufferSize);
523 }
524 }
525
526 private SocketChannel jdkChannel() {
527 return ((NioSocketChannel) channel).javaChannel();
528 }
529 }
530 }