1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.sctp.nio;
17
18 import com.sun.nio.sctp.Association;
19 import com.sun.nio.sctp.MessageInfo;
20 import com.sun.nio.sctp.NotificationHandler;
21 import com.sun.nio.sctp.SctpChannel;
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.nio.AbstractNioMessageChannel;
32 import io.netty.channel.sctp.DefaultSctpChannelConfig;
33 import io.netty.channel.sctp.SctpChannelConfig;
34 import io.netty.channel.sctp.SctpMessage;
35 import io.netty.channel.sctp.SctpNotificationHandler;
36 import io.netty.channel.sctp.SctpServerChannel;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.StringUtil;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.io.IOException;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.SocketAddress;
46 import java.nio.ByteBuffer;
47 import java.nio.channels.SelectionKey;
48 import java.util.Collections;
49 import java.util.HashSet;
50 import java.util.Iterator;
51 import java.util.LinkedHashSet;
52 import java.util.List;
53 import java.util.Set;
54
55
56
57
58
59
60
61
62 public class NioSctpChannel extends AbstractNioMessageChannel implements io.netty.channel.sctp.SctpChannel {
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64
65 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSctpChannel.class);
66
67 private final SctpChannelConfig config;
68
69 private final NotificationHandler<?> notificationHandler;
70
71 private static SctpChannel newSctpChannel() {
72 try {
73 return SctpChannel.open();
74 } catch (IOException e) {
75 throw new ChannelException("Failed to open a sctp channel.", e);
76 }
77 }
78
79
80
81
82 public NioSctpChannel() {
83 this(newSctpChannel());
84 }
85
86
87
88
89 public NioSctpChannel(SctpChannel sctpChannel) {
90 this(null, sctpChannel);
91 }
92
93
94
95
96
97
98
99
100 public NioSctpChannel(Channel parent, SctpChannel sctpChannel) {
101 super(parent, sctpChannel, SelectionKey.OP_READ);
102 try {
103 sctpChannel.configureBlocking(false);
104 config = new NioSctpChannelConfig(this, sctpChannel);
105 notificationHandler = new SctpNotificationHandler(this);
106 } catch (IOException e) {
107 try {
108 sctpChannel.close();
109 } catch (IOException e2) {
110 if (logger.isWarnEnabled()) {
111 logger.warn(
112 "Failed to close a partially initialized sctp channel.", e2);
113 }
114 }
115
116 throw new ChannelException("Failed to enter non-blocking mode.", e);
117 }
118 }
119
120 @Override
121 public InetSocketAddress localAddress() {
122 return (InetSocketAddress) super.localAddress();
123 }
124
125 @Override
126 public InetSocketAddress remoteAddress() {
127 return (InetSocketAddress) super.remoteAddress();
128 }
129
130 @Override
131 public SctpServerChannel parent() {
132 return (SctpServerChannel) super.parent();
133 }
134
135 @Override
136 public ChannelMetadata metadata() {
137 return METADATA;
138 }
139
140 @Override
141 public Association association() {
142 try {
143 return javaChannel().association();
144 } catch (IOException ignored) {
145 return null;
146 }
147 }
148
149 @Override
150 public Set<InetSocketAddress> allLocalAddresses() {
151 try {
152 final Set<SocketAddress> allLocalAddresses = javaChannel().getAllLocalAddresses();
153 final Set<InetSocketAddress> addresses = new LinkedHashSet<InetSocketAddress>(allLocalAddresses.size());
154 for (SocketAddress socketAddress : allLocalAddresses) {
155 addresses.add((InetSocketAddress) socketAddress);
156 }
157 return addresses;
158 } catch (Throwable ignored) {
159 return Collections.emptySet();
160 }
161 }
162
163 @Override
164 public SctpChannelConfig config() {
165 return config;
166 }
167
168 @Override
169 public Set<InetSocketAddress> allRemoteAddresses() {
170 try {
171 final Set<SocketAddress> allLocalAddresses = javaChannel().getRemoteAddresses();
172 final Set<InetSocketAddress> addresses = new HashSet<InetSocketAddress>(allLocalAddresses.size());
173 for (SocketAddress socketAddress : allLocalAddresses) {
174 addresses.add((InetSocketAddress) socketAddress);
175 }
176 return addresses;
177 } catch (Throwable ignored) {
178 return Collections.emptySet();
179 }
180 }
181
182 @Override
183 protected SctpChannel javaChannel() {
184 return (SctpChannel) super.javaChannel();
185 }
186
187 @Override
188 public boolean isActive() {
189 SctpChannel ch = javaChannel();
190 return ch.isOpen() && association() != null;
191 }
192
193 @Override
194 protected SocketAddress localAddress0() {
195 try {
196 Iterator<SocketAddress> i = javaChannel().getAllLocalAddresses().iterator();
197 if (i.hasNext()) {
198 return i.next();
199 }
200 } catch (IOException e) {
201
202 }
203 return null;
204 }
205
206 @Override
207 protected SocketAddress remoteAddress0() {
208 try {
209 Iterator<SocketAddress> i = javaChannel().getRemoteAddresses().iterator();
210 if (i.hasNext()) {
211 return i.next();
212 }
213 } catch (IOException e) {
214
215 }
216 return null;
217 }
218
219 @Override
220 protected void doBind(SocketAddress localAddress) throws Exception {
221 javaChannel().bind(localAddress);
222 }
223
224 @Override
225 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
226 if (localAddress != null) {
227 javaChannel().bind(localAddress);
228 }
229
230 boolean success = false;
231 try {
232 boolean connected = javaChannel().connect(remoteAddress);
233 if (!connected) {
234 selectionKey().interestOps(SelectionKey.OP_CONNECT);
235 }
236 success = true;
237 return connected;
238 } finally {
239 if (!success) {
240 doClose();
241 }
242 }
243 }
244
245 @Override
246 protected void doFinishConnect() throws Exception {
247 if (!javaChannel().finishConnect()) {
248 throw new Error();
249 }
250 }
251
252 @Override
253 protected void doDisconnect() throws Exception {
254 doClose();
255 }
256
257 @Override
258 protected void doClose() throws Exception {
259 javaChannel().close();
260 }
261
262 @Override
263 protected int doReadMessages(List<Object> buf) throws Exception {
264 SctpChannel ch = javaChannel();
265
266 RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
267 ByteBuf buffer = allocHandle.allocate(config().getAllocator());
268 boolean free = true;
269 try {
270 ByteBuffer data = buffer.internalNioBuffer(buffer.writerIndex(), buffer.writableBytes());
271 int pos = data.position();
272
273 MessageInfo messageInfo = ch.receive(data, null, notificationHandler);
274 if (messageInfo == null) {
275 return 0;
276 }
277
278 allocHandle.lastBytesRead(data.position() - pos);
279 buf.add(new SctpMessage(messageInfo,
280 buffer.writerIndex(buffer.writerIndex() + allocHandle.lastBytesRead())));
281 free = false;
282 return 1;
283 } catch (Throwable cause) {
284 PlatformDependent.throwException(cause);
285 return -1;
286 } finally {
287 if (free) {
288 buffer.release();
289 }
290 }
291 }
292
293 @Override
294 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
295 SctpMessage packet = (SctpMessage) msg;
296 ByteBuf data = packet.content();
297 int dataLen = data.readableBytes();
298 if (dataLen == 0) {
299 return true;
300 }
301
302 ByteBufAllocator alloc = alloc();
303 boolean needsCopy = data.nioBufferCount() != 1;
304 if (!needsCopy) {
305 if (!data.isDirect() && alloc.isDirectBufferPooled()) {
306 needsCopy = true;
307 }
308 }
309 ByteBuffer nioData;
310 if (needsCopy) {
311 data = alloc.directBuffer(dataLen).writeBytes(data);
312 }
313 nioData = data.nioBuffer();
314 final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
315 mi.payloadProtocolID(packet.protocolIdentifier());
316 mi.streamNumber(packet.streamIdentifier());
317 mi.unordered(packet.isUnordered());
318
319 final int writtenBytes = javaChannel().send(nioData, mi);
320 return writtenBytes > 0;
321 }
322
323 @Override
324 protected final Object filterOutboundMessage(Object msg) throws Exception {
325 if (msg instanceof SctpMessage) {
326 SctpMessage m = (SctpMessage) msg;
327 ByteBuf buf = m.content();
328 if (buf.isDirect() && buf.nioBufferCount() == 1) {
329 return m;
330 }
331
332 return new SctpMessage(m.protocolIdentifier(), m.streamIdentifier(), m.isUnordered(),
333 newDirectBuffer(m, buf));
334 }
335
336 throw new UnsupportedOperationException(
337 "unsupported message type: " + StringUtil.simpleClassName(msg) +
338 " (expected: " + StringUtil.simpleClassName(SctpMessage.class));
339 }
340
341 @Override
342 public ChannelFuture bindAddress(InetAddress localAddress) {
343 return bindAddress(localAddress, newPromise());
344 }
345
346 @Override
347 public ChannelFuture bindAddress(final InetAddress localAddress, final ChannelPromise promise) {
348 if (eventLoop().inEventLoop()) {
349 try {
350 javaChannel().bindAddress(localAddress);
351 promise.setSuccess();
352 } catch (Throwable t) {
353 promise.setFailure(t);
354 }
355 } else {
356 eventLoop().execute(new Runnable() {
357 @Override
358 public void run() {
359 bindAddress(localAddress, promise);
360 }
361 });
362 }
363 return promise;
364 }
365
366 @Override
367 public ChannelFuture unbindAddress(InetAddress localAddress) {
368 return unbindAddress(localAddress, newPromise());
369 }
370
371 @Override
372 public ChannelFuture unbindAddress(final InetAddress localAddress, final ChannelPromise promise) {
373 if (eventLoop().inEventLoop()) {
374 try {
375 javaChannel().unbindAddress(localAddress);
376 promise.setSuccess();
377 } catch (Throwable t) {
378 promise.setFailure(t);
379 }
380 } else {
381 eventLoop().execute(new Runnable() {
382 @Override
383 public void run() {
384 unbindAddress(localAddress, promise);
385 }
386 });
387 }
388 return promise;
389 }
390
391 private final class NioSctpChannelConfig extends DefaultSctpChannelConfig {
392 private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) {
393 super(channel, javaChannel);
394 }
395
396 @Override
397 protected void autoReadCleared() {
398 clearReadPending();
399 }
400 }
401 }