查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2011 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.sctp.nio;
17  
18  import com.sun.nio.sctp.Association;
19  import com.sun.nio.sctp.MessageInfo;
20  import com.sun.nio.sctp.NotificationHandler;
21  import com.sun.nio.sctp.SctpChannel;
22  import io.netty.buffer.ByteBuf;
23  import io.netty.buffer.ByteBufAllocator;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.nio.AbstractNioMessageChannel;
32  import io.netty.channel.sctp.DefaultSctpChannelConfig;
33  import io.netty.channel.sctp.SctpChannelConfig;
34  import io.netty.channel.sctp.SctpMessage;
35  import io.netty.channel.sctp.SctpNotificationHandler;
36  import io.netty.channel.sctp.SctpServerChannel;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.StringUtil;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.InetAddress;
44  import java.net.InetSocketAddress;
45  import java.net.SocketAddress;
46  import java.nio.ByteBuffer;
47  import java.nio.channels.SelectionKey;
48  import java.util.Collections;
49  import java.util.HashSet;
50  import java.util.Iterator;
51  import java.util.LinkedHashSet;
52  import java.util.List;
53  import java.util.Set;
54  
55  /**
56   * {@link io.netty.channel.sctp.SctpChannel} implementation which use non-blocking mode and allows to read /
57   * write {@link SctpMessage}s to the underlying {@link SctpChannel}.
58   *
59   * Be aware that not all operations systems support SCTP. Please refer to the documentation of your operation system,
60   * to understand what you need to do to use it. Also this feature is only supported on Java 7+.
61   */
62  public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
63      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64  
65      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
66  
67      private final SctpChannelConfig config;
68  
69      private final NotificationHandler<?> notificationHandler;
70  
71      private static SctpChannel newSctpChannel() {
72          try {
73              return SctpChannel.open();
74          } catch (IOException e) {
75              throw new ChannelException("Failed to open a sctp channel.", e);
76          }
77      }
78  
79      /**
80       * Create a new instance
81       */
82      public NioSctpChannel() {
83          this(newSctpChannel());
84      }
85  
86      /**
87       * Create a new instance using {@link SctpChannel}
88       */
89      public NioSctpChannel(SctpChannel sctpChannel) {
90          this(null, sctpChannel);
91      }
92  
93      /**
94       * Create a new instance
95       *
96       * @param parent        the {@link Channel} which is the parent of this {@link NioSctpChannel}
97       *                      or {@code null}.
98       * @param sctpChannel   the underlying {@link SctpChannel}
99       */
100     public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
101         super(parent, sctpChannel, SelectionKey.OP_READ);
102         try {
103             sctpChannel.configureBlocking(false);
104             config = new NioSctpChannelConfig(this, sctpChannel);
105             notificationHandler = new SctpNotificationHandler(this);
106         } catch (IOException e) {
107             try {
108                 sctpChannel.close();
109             } catch (IOException e2) {
110                 if (logger.isWarnEnabled()) {
111                     logger.warn(
112                             "Failed to close a partially initialized sctp channel.", e2);
113                 }
114             }
115 
116             throw new ChannelException("Failed to enter non-blocking mode.", e);
117         }
118     }
119 
120     @Override
121     public InetSocketAddress localAddress() {
122         return (InetSocketAddress) super.localAddress();
123     }
124 
125     @Override
126     public InetSocketAddress remoteAddress() {
127         return (InetSocketAddress) super.remoteAddress();
128     }
129 
130     @Override
131     public SctpServerChannel parent() {
132         return (SctpServerChannel) super.parent();
133     }
134 
135     @Override
136     public ChannelMetadata metadata() {
137         return METADATA;
138     }
139 
140     @Override
141     public Association association() {
142         try {
143             return javaChannel().association();
144         } catch (IOException ignored) {
145             return null;
146         }
147     }
148 
149     @Override
150     public Set<InetSocketAddress> allLocalAddresses() {
151         try {
152             final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
153             final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
154             for (SocketAddress socketAddress : allLocalAddresses) {
155                 addresses.add((InetSocketAddress) socketAddress);
156             }
157             return addresses;
158         } catch (Throwable ignored) {
159             return Collections.emptySet();
160         }
161     }
162 
163     @Override
164     public SctpChannelConfig config() {
165         return config;
166     }
167 
168     @Override
169     public Set<InetSocketAddress> allRemoteAddresses() {
170         try {
171             final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
172             final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
173             for (SocketAddress socketAddress : allLocalAddresses) {
174                 addresses.add((InetSocketAddress) socketAddress);
175             }
176             return addresses;
177         } catch (Throwable ignored) {
178             return Collections.emptySet();
179         }
180     }
181 
182     @Override
183     protected SctpChannel javaChannel() {
184         return (SctpChannel) super.javaChannel();
185     }
186 
187     @Override
188     public boolean isActive() {
189         SctpChannel ch = javaChannel();
190         return ch.isOpen() && association() != null;
191     }
192 
193     @Override
194     protected SocketAddress localAddress0() {
195         try {
196             Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
197             if (i.hasNext()) {
198                 return i.next();
199             }
200         } catch (IOException e) {
201             // ignore
202         }
203         return null;
204     }
205 
206     @Override
207     protected SocketAddress remoteAddress0() {
208         try {
209             Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
210             if (i.hasNext()) {
211                 return i.next();
212             }
213         } catch (IOException e) {
214             // ignore
215         }
216         return null;
217     }
218 
219     @Override
220     protected void doBind(SocketAddress localAddress) throws Exception {
221         javaChannel().bind(localAddress);
222     }
223 
224     @Override
225     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
226         if (localAddress != null) {
227             javaChannel().bind(localAddress);
228         }
229 
230         boolean success = false;
231         try {
232             boolean connected = javaChannel().connect(remoteAddress);
233             if (!connected) {
234                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
235             }
236             success = true;
237             return connected;
238         } finally {
239             if (!success) {
240                 doClose();
241             }
242         }
243     }
244 
245     @Override
246     protected void doFinishConnect() throws Exception {
247         if (!javaChannel().finishConnect()) {
248             throw new Error();
249         }
250     }
251 
252     @Override
253     protected void doDisconnect() throws Exception {
254         doClose();
255     }
256 
257     @Override
258     protected void doClose() throws Exception {
259         javaChannel().close();
260     }
261 
262     @Override
263     protected int doReadMessages(List<Object> buf) throws Exception {
264         SctpChannel ch = javaChannel();
265 
266         RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
267         ByteBuf buffer = allocHandle.allocate(config().getAllocator());
268         boolean free = true;
269         try {
270             ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
271             int pos = data.position();
272 
273             MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
274             if (messageInfo == null) {
275                 return 0;
276             }
277 
278             allocHandle.lastBytesRead(data.position() - pos);
279             buf.add(new SctpMessage(messageInfo,
280                     buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
281             free = false;
282             return 1;
283         } catch (Throwable cause) {
284             PlatformDependent.throwException(cause);
285             return -1;
286         }  finally {
287             if (free) {
288                 buffer.release();
289             }
290         }
291     }
292 
293     @Override
294     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
295         SctpMessage packet = (SctpMessage) msg;
296         ByteBuf data = packet.content();
297         int dataLen = data.readableBytes();
298         if (dataLen == 0) {
299             return true;
300         }
301 
302         ByteBufAllocator alloc = alloc();
303         boolean needsCopy = data.nioBufferCount() != 1;
304         if (!needsCopy) {
305             if (!data.isDirect() && alloc.isDirectBufferPooled()) {
306                 needsCopy = true;
307             }
308         }
309         ByteBuffer nioData;
310         if (needsCopy) {
311             data = alloc.directBuffer(dataLen).writeBytes(data);
312         }
313         nioData = data.nioBuffer();
314         final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
315         mi.payloadProtocolID(packet.protocolIdentifier());
316         mi.streamNumber(packet.streamIdentifier());
317         mi.unordered(packet.isUnordered());
318 
319         final int writtenBytes = javaChannel().send(nioData, mi);
320         return writtenBytes > 0;
321     }
322 
323     @Override
324     protected final Object filterOutboundMessage(Object msg) throws Exception {
325         if (msg instanceof SctpMessage) {
326             SctpMessage m = (SctpMessage) msg;
327             ByteBuf buf = m.content();
328             if (buf.isDirect() && buf.nioBufferCount() == 1) {
329                 return m;
330             }
331 
332             return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
333                                    newDirectBuffer(m, buf));
334         }
335 
336         throw new UnsupportedOperationException(
337                 "unsupported message type: " + StringUtil.simpleClassName(msg) +
338                 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
339     }
340 
341     @Override
342     public ChannelFuture bindAddress(InetAddress localAddress) {
343         return bindAddress(localAddress, newPromise());
344     }
345 
346     @Override
347     public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
348         if (eventLoop().inEventLoop()) {
349             try {
350                 javaChannel().bindAddress(localAddress);
351                 promise.setSuccess();
352             } catch (Throwable t) {
353                 promise.setFailure(t);
354             }
355         } else {
356             eventLoop().execute(new Runnable() {
357                 @Override
358                 public void run() {
359                     bindAddress(localAddress, promise);
360                 }
361             });
362         }
363         return promise;
364     }
365 
366     @Override
367     public ChannelFuture unbindAddress(InetAddress localAddress) {
368         return unbindAddress(localAddress, newPromise());
369     }
370 
371     @Override
372     public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
373         if (eventLoop().inEventLoop()) {
374             try {
375                 javaChannel().unbindAddress(localAddress);
376                 promise.setSuccess();
377             } catch (Throwable t) {
378                 promise.setFailure(t);
379             }
380         } else {
381             eventLoop().execute(new Runnable() {
382                 @Override
383                 public void run() {
384                     unbindAddress(localAddress, promise);
385                 }
386             });
387         }
388         return promise;
389     }
390 
391     private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
392         private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
393             super(channel, javaChannel);
394         }
395 
396         @Override
397         protected void autoReadCleared() {
398             clearReadPending();
399         }
400     }
401 }