1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelException;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
24 import org.jboss.netty.util.ThreadNameDeterminer;
25
26 import java.io.IOException;
27 import java.net.SocketAddress;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.SocketChannel;
32 import java.util.concurrent.Executor;
33
34 import static org.jboss.netty.channel.Channels.*;
35
36 public class NioWorker extends AbstractNioWorker {
37
38 private final SocketReceiveBufferAllocator recvBufferPool = new SocketReceiveBufferAllocator();
39
40 public NioWorker(Executor executor) {
41 super(executor);
42 }
43
44 public NioWorker(Executor executor, ThreadNameDeterminer determiner) {
45 super(executor, determiner);
46 }
47
48 @Override
49 protected boolean read(SelectionKey k) {
50 final SocketChannel ch = (SocketChannel) k.channel();
51 final NioSocketChannel channel = (NioSocketChannel) k.attachment();
52
53 final ReceiveBufferSizePredictor predictor =
54 channel.getConfig().getReceiveBufferSizePredictor();
55 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
56 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
57
58 int ret = 0;
59 int readBytes = 0;
60 boolean failure = true;
61
62 ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
63 try {
64 while ((ret = ch.read(bb)) > 0) {
65 readBytes += ret;
66 if (!bb.hasRemaining()) {
67 break;
68 }
69 }
70 failure = false;
71 } catch (ClosedChannelException e) {
72
73 } catch (Throwable t) {
74 fireExceptionCaught(channel, t);
75 }
76
77 if (readBytes > 0) {
78 bb.flip();
79
80 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
81 buffer.setBytes(0, bb);
82 buffer.writerIndex(readBytes);
83
84
85 predictor.previousReceiveBufferSize(readBytes);
86
87
88 fireMessageReceived(channel, buffer);
89 }
90
91 if (ret < 0 || failure) {
92 k.cancel();
93 close(channel, succeededFuture(channel));
94 return false;
95 }
96
97 return true;
98 }
99
100 @Override
101 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
102 final Thread currentThread = Thread.currentThread();
103 final Thread workerThread = thread;
104 if (currentThread != workerThread) {
105 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
106 registerTask(channel.writeTask);
107 }
108
109 return true;
110 }
111
112 return false;
113 }
114
115 @Override
116 protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
117 boolean server = !(channel instanceof NioClientSocketChannel);
118 return new RegisterTask((NioSocketChannel) channel, future, server);
119 }
120
121 private final class RegisterTask implements Runnable {
122 private final NioSocketChannel channel;
123 private final ChannelFuture future;
124 private final boolean server;
125
126 RegisterTask(
127 NioSocketChannel channel, ChannelFuture future, boolean server) {
128
129 this.channel = channel;
130 this.future = future;
131 this.server = server;
132 }
133
134 public void run() {
135 SocketAddress localAddress = channel.getLocalAddress();
136 SocketAddress remoteAddress = channel.getRemoteAddress();
137
138 if (localAddress == null || remoteAddress == null) {
139 if (future != null) {
140 future.setFailure(new ClosedChannelException());
141 }
142 close(channel, succeededFuture(channel));
143 return;
144 }
145
146 try {
147 if (server) {
148 channel.channel.configureBlocking(false);
149 }
150
151 channel.channel.register(
152 selector, channel.getInternalInterestOps(), channel);
153
154 if (future != null) {
155 channel.setConnected();
156 future.setSuccess();
157 }
158
159 if (server || !((NioClientSocketChannel) channel).boundManually) {
160 fireChannelBound(channel, localAddress);
161 }
162 fireChannelConnected(channel, remoteAddress);
163 } catch (IOException e) {
164 if (future != null) {
165 future.setFailure(e);
166 }
167 close(channel, succeededFuture(channel));
168 if (!(e instanceof ClosedChannelException)) {
169 throw new ChannelException(
170 "Failed to register a socket to the selector.", e);
171 }
172 }
173 }
174 }
175
176 @Override
177 public void run() {
178 super.run();
179 recvBufferPool.releaseExternalResources();
180 }
181 }