查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioByteChannel;
30  import io.netty.channel.socket.DefaultSocketChannelConfig;
31  import io.netty.channel.socket.ServerSocketChannel;
32  import io.netty.channel.socket.SocketChannelConfig;
33  import io.netty.util.concurrent.GlobalEventExecutor;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.SocketUtils;
36  import io.netty.util.internal.SuppressJava6Requirement;
37  import io.netty.util.internal.UnstableApi;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.net.InetSocketAddress;
43  import java.net.Socket;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.SelectionKey;
47  import java.nio.channels.SocketChannel;
48  import java.nio.channels.spi.SelectorProvider;
49  import java.util.Map;
50  import java.util.concurrent.Executor;
51  
52  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
53  
54  /**
55   * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
56   */
57  public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
58      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
59      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
60  
61      private static SocketChannel newSocket(SelectorProvider provider) {
62          try {
63              /**
64               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
65               *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
66               *
67               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
68               */
69              return provider.openSocketChannel();
70          } catch (IOException e) {
71              throw new ChannelException("Failed to open a socket.", e);
72          }
73      }
74  
75      private final SocketChannelConfig config;
76  
77      /**
78       * Create a new instance
79       */
80      public NioSocketChannel() {
81          this(DEFAULT_SELECTOR_PROVIDER);
82      }
83  
84      /**
85       * Create a new instance using the given {@link SelectorProvider}.
86       */
87      public NioSocketChannel(SelectorProvider provider) {
88          this(newSocket(provider));
89      }
90  
91      /**
92       * Create a new instance using the given {@link SocketChannel}.
93       */
94      public NioSocketChannel(SocketChannel socket) {
95          this(null, socket);
96      }
97  
98      /**
99       * Create a new instance
100      *
101      * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
102      * @param socket    the {@link SocketChannel} which will be used
103      */
104     public NioSocketChannel(Channel parent, SocketChannel socket) {
105         super(parent, socket);
106         config = new NioSocketChannelConfig(this, socket.socket());
107     }
108 
109     @Override
110     public ServerSocketChannel parent() {
111         return (ServerSocketChannel) super.parent();
112     }
113 
114     @Override
115     public SocketChannelConfig config() {
116         return config;
117     }
118 
119     @Override
120     protected SocketChannel javaChannel() {
121         return (SocketChannel) super.javaChannel();
122     }
123 
124     @Override
125     public boolean isActive() {
126         SocketChannel ch = javaChannel();
127         return ch.isOpen() && ch.isConnected();
128     }
129 
130     @Override
131     public boolean isOutputShutdown() {
132         return javaChannel().socket().isOutputShutdown() || !isActive();
133     }
134 
135     @Override
136     public boolean isInputShutdown() {
137         return javaChannel().socket().isInputShutdown() || !isActive();
138     }
139 
140     @Override
141     public boolean isShutdown() {
142         Socket socket = javaChannel().socket();
143         return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
144     }
145 
146     @Override
147     public InetSocketAddress localAddress() {
148         return (InetSocketAddress) super.localAddress();
149     }
150 
151     @Override
152     public InetSocketAddress remoteAddress() {
153         return (InetSocketAddress) super.remoteAddress();
154     }
155 
156     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
157     @UnstableApi
158     @Override
159     protected final void doShutdownOutput() throws Exception {
160         if (PlatformDependent.javaVersion() >= 7) {
161             javaChannel().shutdownOutput();
162         } else {
163             javaChannel().socket().shutdownOutput();
164         }
165     }
166 
167     @Override
168     public ChannelFuture shutdownOutput() {
169         return shutdownOutput(newPromise());
170     }
171 
172     @Override
173     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
174         final EventLoop loop = eventLoop();
175         if (loop.inEventLoop()) {
176             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
177         } else {
178             loop.execute(new Runnable() {
179                 @Override
180                 public void run() {
181                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
182                 }
183             });
184         }
185         return promise;
186     }
187 
188     @Override
189     public ChannelFuture shutdownInput() {
190         return shutdownInput(newPromise());
191     }
192 
193     @Override
194     protected boolean isInputShutdown0() {
195         return isInputShutdown();
196     }
197 
198     @Override
199     public ChannelFuture shutdownInput(final ChannelPromise promise) {
200         EventLoop loop = eventLoop();
201         if (loop.inEventLoop()) {
202             shutdownInput0(promise);
203         } else {
204             loop.execute(new Runnable() {
205                 @Override
206                 public void run() {
207                     shutdownInput0(promise);
208                 }
209             });
210         }
211         return promise;
212     }
213 
214     @Override
215     public ChannelFuture shutdown() {
216         return shutdown(newPromise());
217     }
218 
219     @Override
220     public ChannelFuture shutdown(final ChannelPromise promise) {
221         ChannelFuture shutdownOutputFuture = shutdownOutput();
222         if (shutdownOutputFuture.isDone()) {
223             shutdownOutputDone(shutdownOutputFuture, promise);
224         } else {
225             shutdownOutputFuture.addListener(new ChannelFutureListener() {
226                 @Override
227                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
228                     shutdownOutputDone(shutdownOutputFuture, promise);
229                 }
230             });
231         }
232         return promise;
233     }
234 
235     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
236         ChannelFuture shutdownInputFuture = shutdownInput();
237         if (shutdownInputFuture.isDone()) {
238             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
239         } else {
240             shutdownInputFuture.addListener(new ChannelFutureListener() {
241                 @Override
242                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
243                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
244                 }
245             });
246         }
247     }
248 
249     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
250                                      ChannelFuture shutdownInputFuture,
251                                      ChannelPromise promise) {
252         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
253         Throwable shutdownInputCause = shutdownInputFuture.cause();
254         if (shutdownOutputCause != null) {
255             if (shutdownInputCause != null) {
256                 logger.debug("Exception suppressed because a previous exception occurred.",
257                         shutdownInputCause);
258             }
259             promise.setFailure(shutdownOutputCause);
260         } else if (shutdownInputCause != null) {
261             promise.setFailure(shutdownInputCause);
262         } else {
263             promise.setSuccess();
264         }
265     }
266     private void shutdownInput0(final ChannelPromise promise) {
267         try {
268             shutdownInput0();
269             promise.setSuccess();
270         } catch (Throwable t) {
271             promise.setFailure(t);
272         }
273     }
274 
275     @SuppressJava6Requirement(reason = "Usage guarded by java version check")
276     private void shutdownInput0() throws Exception {
277         if (PlatformDependent.javaVersion() >= 7) {
278             javaChannel().shutdownInput();
279         } else {
280             javaChannel().socket().shutdownInput();
281         }
282     }
283 
284     @Override
285     protected SocketAddress localAddress0() {
286         return javaChannel().socket().getLocalSocketAddress();
287     }
288 
289     @Override
290     protected SocketAddress remoteAddress0() {
291         return javaChannel().socket().getRemoteSocketAddress();
292     }
293 
294     @Override
295     protected void doBind(SocketAddress localAddress) throws Exception {
296         doBind0(localAddress);
297     }
298 
299     private void doBind0(SocketAddress localAddress) throws Exception {
300         if (PlatformDependent.javaVersion() >= 7) {
301             SocketUtils.bind(javaChannel(), localAddress);
302         } else {
303             SocketUtils.bind(javaChannel().socket(), localAddress);
304         }
305     }
306 
307     @Override
308     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
309         if (localAddress != null) {
310             doBind0(localAddress);
311         }
312 
313         boolean success = false;
314         try {
315             boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
316             if (!connected) {
317                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
318             }
319             success = true;
320             return connected;
321         } finally {
322             if (!success) {
323                 doClose();
324             }
325         }
326     }
327 
328     @Override
329     protected void doFinishConnect() throws Exception {
330         if (!javaChannel().finishConnect()) {
331             throw new Error();
332         }
333     }
334 
335     @Override
336     protected void doDisconnect() throws Exception {
337         doClose();
338     }
339 
340     @Override
341     protected void doClose() throws Exception {
342         super.doClose();
343         javaChannel().close();
344     }
345 
346     @Override
347     protected int doReadBytes(ByteBuf byteBuf) throws Exception {
348         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
349         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
350         return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
351     }
352 
353     @Override
354     protected int doWriteBytes(ByteBuf buf) throws Exception {
355         final int expectedWrittenBytes = buf.readableBytes();
356         return buf.readBytes(javaChannel(), expectedWrittenBytes);
357     }
358 
359     @Override
360     protected long doWriteFileRegion(FileRegion region) throws Exception {
361         final long position = region.transferred();
362         return region.transferTo(javaChannel(), position);
363     }
364 
365     private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
366         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
367         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
368         // make a best effort to adjust as OS behavior changes.
369         if (attempted == written) {
370             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
371                 ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
372             }
373         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
374             ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
375         }
376     }
377 
378     @Override
379     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
380         SocketChannel ch = javaChannel();
381         int writeSpinCount = config().getWriteSpinCount();
382         do {
383             if (in.isEmpty()) {
384                 // All written so clear OP_WRITE
385                 clearOpWrite();
386                 // Directly return here so incompleteWrite(...) is not called.
387                 return;
388             }
389 
390             // Ensure the pending writes are made of ByteBufs only.
391             int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
392             ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
393             int nioBufferCnt = in.nioBufferCount();
394 
395             // Always use nioBuffers() to workaround data-corruption.
396             // See https://github.com/netty/netty/issues/2761
397             switch (nioBufferCnt) {
398                 case 0:
399                     // We have something else beside ByteBuffers to write so fallback to normal writes.
400                     writeSpinCount -= doWrite0(in);
401                     break;
402                 case 1: {
403                     // Only one ByteBuf so use non-gathering write
404                     // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
405                     // to check if the total size of all the buffers is non-zero.
406                     ByteBuffer buffer = nioBuffers[0];
407                     int attemptedBytes = buffer.remaining();
408                     final int localWrittenBytes = ch.write(buffer);
409                     if (localWrittenBytes <= 0) {
410                         incompleteWrite(true);
411                         return;
412                     }
413                     adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
414                     in.removeBytes(localWrittenBytes);
415                     --writeSpinCount;
416                     break;
417                 }
418                 default: {
419                     // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
420                     // to check if the total size of all the buffers is non-zero.
421                     // We limit the max amount to int above so cast is safe
422                     long attemptedBytes = in.nioBufferSize();
423                     final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
424                     if (localWrittenBytes <= 0) {
425                         incompleteWrite(true);
426                         return;
427                     }
428                     // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
429                     adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
430                             maxBytesPerGatheringWrite);
431                     in.removeBytes(localWrittenBytes);
432                     --writeSpinCount;
433                     break;
434                 }
435             }
436         } while (writeSpinCount > 0);
437 
438         incompleteWrite(writeSpinCount < 0);
439     }
440 
441     @Override
442     protected AbstractNioUnsafe newUnsafe() {
443         return new NioSocketChannelUnsafe();
444     }
445 
446     private final class NioSocketChannelUnsafe extends NioByteUnsafe {
447         @Override
448         protected Executor prepareToClose() {
449             try {
450                 if (javaChannel().isOpen() && config().getSoLinger() > 0) {
451                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
452                     // because we try to read or write until the actual close happens which may be later due
453                     // SO_LINGER handling.
454                     // See https://github.com/netty/netty/issues/4449
455                     doDeregister();
456                     return GlobalEventExecutor.INSTANCE;
457                 }
458             } catch (Throwable ignore) {
459                 // Ignore the error as the underlying channel may be closed in the meantime and so
460                 // getSoLinger() may produce an exception. In this case we just return null.
461                 // See https://github.com/netty/netty/issues/4449
462             }
463             return null;
464         }
465     }
466 
467     private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
468         private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
469         private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
470             super(channel, javaSocket);
471             calculateMaxBytesPerGatheringWrite();
472         }
473 
474         @Override
475         protected void autoReadCleared() {
476             clearReadPending();
477         }
478 
479         @Override
480         public NioSocketChannelConfig setSendBufferSize(int sendBufferSize) {
481             super.setSendBufferSize(sendBufferSize);
482             calculateMaxBytesPerGatheringWrite();
483             return this;
484         }
485 
486         @Override
487         public <T> boolean setOption(ChannelOption<T> option, T value) {
488             if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
489                 return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
490             }
491             return super.setOption(option, value);
492         }
493 
494         @Override
495         public <T> T getOption(ChannelOption<T> option) {
496             if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
497                 return NioChannelOption.getOption(jdkChannel(), (NioChannelOption<T>) option);
498             }
499             return super.getOption(option);
500         }
501 
502         @Override
503         public Map<ChannelOption<?>, Object> getOptions() {
504             if (PlatformDependent.javaVersion() >= 7) {
505                 return getOptions(super.getOptions(), NioChannelOption.getOptions(jdkChannel()));
506             }
507             return super.getOptions();
508         }
509 
510         void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) {
511             this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
512         }
513 
514         int getMaxBytesPerGatheringWrite() {
515             return maxBytesPerGatheringWrite;
516         }
517 
518         private void calculateMaxBytesPerGatheringWrite() {
519             // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
520             int newSendBufferSize = getSendBufferSize() << 1;
521             if (newSendBufferSize > 0) {
522                 setMaxBytesPerGatheringWrite(newSendBufferSize);
523             }
524         }
525 
526         private SocketChannel jdkChannel() {
527             return ((NioSocketChannel) channel).javaChannel();
528         }
529     }
530 }