1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelOption;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.FileRegion;
26 import io.netty.channel.RecvByteBufAllocator;
27 import io.netty.channel.socket.ChannelInputShutdownEvent;
28 import io.netty.util.internal.StringUtil;
29
30 import java.io.IOException;
31 import java.nio.channels.SelectableChannel;
32 import java.nio.channels.SelectionKey;
33
34
35
36
37 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
38
39 private static final String EXPECTED_TYPES =
40 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
41 StringUtil.simpleClassName(FileRegion.class) + ')';
42
43 private Runnable flushTask;
44
45
46
47
48
49
50
51 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
52 super(parent, ch, SelectionKey.OP_READ);
53 }
54
55 @Override
56 protected AbstractNioUnsafe newUnsafe() {
57 return new NioByteUnsafe();
58 }
59
60 protected class NioByteUnsafe extends AbstractNioUnsafe {
61 private RecvByteBufAllocator.Handle allocHandle;
62
63 private void closeOnRead(ChannelPipeline pipeline) {
64 SelectionKey key = selectionKey();
65 setInputShutdown();
66 if (isOpen()) {
67 if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
68 key.interestOps(key.interestOps() & ~readInterestOp);
69 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
70 } else {
71 close(voidPromise());
72 }
73 }
74 }
75
76 private void handleReadException(ChannelPipeline pipeline,
77 ByteBuf byteBuf, Throwable cause, boolean close) {
78 if (byteBuf != null) {
79 if (byteBuf.isReadable()) {
80 setReadPending(false);
81 pipeline.fireChannelRead(byteBuf);
82 } else {
83 byteBuf.release();
84 }
85 }
86 pipeline.fireChannelReadComplete();
87 pipeline.fireExceptionCaught(cause);
88 if (close || cause instanceof IOException) {
89 closeOnRead(pipeline);
90 }
91 }
92
93 @Override
94 public final void read() {
95 final ChannelConfig config = config();
96 if (!config.isAutoRead() && !isReadPending()) {
97
98 removeReadOp();
99 return;
100 }
101
102 final ChannelPipeline pipeline = pipeline();
103 final ByteBufAllocator allocator = config.getAllocator();
104 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
105 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
106 if (allocHandle == null) {
107 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
108 }
109
110 ByteBuf byteBuf = null;
111 int messages = 0;
112 boolean close = false;
113 try {
114 int totalReadAmount = 0;
115 boolean readPendingReset = false;
116 do {
117 byteBuf = allocHandle.allocate(allocator);
118 int writable = byteBuf.writableBytes();
119 int localReadAmount = doReadBytes(byteBuf);
120 if (localReadAmount <= 0) {
121
122 byteBuf.release();
123 byteBuf = null;
124 close = localReadAmount < 0;
125 if (close) {
126
127 setReadPending(false);
128 }
129 break;
130 }
131 if (!readPendingReset) {
132 readPendingReset = true;
133 setReadPending(false);
134 }
135 pipeline.fireChannelRead(byteBuf);
136 byteBuf = null;
137
138 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
139
140 totalReadAmount = Integer.MAX_VALUE;
141 break;
142 }
143
144 totalReadAmount += localReadAmount;
145
146
147 if (!config.isAutoRead()) {
148 break;
149 }
150
151 if (localReadAmount < writable) {
152
153
154 break;
155 }
156 } while (++ messages < maxMessagesPerRead);
157
158 pipeline.fireChannelReadComplete();
159 allocHandle.record(totalReadAmount);
160
161 if (close) {
162 closeOnRead(pipeline);
163 close = false;
164 }
165 } catch (Throwable t) {
166 handleReadException(pipeline, byteBuf, t, close);
167 } finally {
168
169
170
171
172
173
174 if (!config.isAutoRead() && !isReadPending()) {
175 removeReadOp();
176 }
177 }
178 }
179 }
180
181 @Override
182 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
183 int writeSpinCount = -1;
184
185 boolean setOpWrite = false;
186 for (;;) {
187 Object msg = in.current();
188 if (msg == null) {
189
190 clearOpWrite();
191
192 return;
193 }
194
195 if (msg instanceof ByteBuf) {
196 ByteBuf buf = (ByteBuf) msg;
197 int readableBytes = buf.readableBytes();
198 if (readableBytes == 0) {
199 in.remove();
200 continue;
201 }
202
203 boolean done = false;
204 long flushedAmount = 0;
205 if (writeSpinCount == -1) {
206 writeSpinCount = config().getWriteSpinCount();
207 }
208 for (int i = writeSpinCount - 1; i >= 0; i --) {
209 int localFlushedAmount = doWriteBytes(buf);
210 if (localFlushedAmount == 0) {
211 setOpWrite = true;
212 break;
213 }
214
215 flushedAmount += localFlushedAmount;
216 if (!buf.isReadable()) {
217 done = true;
218 break;
219 }
220 }
221
222 in.progress(flushedAmount);
223
224 if (done) {
225 in.remove();
226 } else {
227
228 break;
229 }
230 } else if (msg instanceof FileRegion) {
231 FileRegion region = (FileRegion) msg;
232 boolean done = region.transfered() >= region.count();
233
234 if (!done) {
235 long flushedAmount = 0;
236 if (writeSpinCount == -1) {
237 writeSpinCount = config().getWriteSpinCount();
238 }
239
240 for (int i = writeSpinCount - 1; i >= 0; i--) {
241 long localFlushedAmount = doWriteFileRegion(region);
242 if (localFlushedAmount == 0) {
243 setOpWrite = true;
244 break;
245 }
246
247 flushedAmount += localFlushedAmount;
248 if (region.transfered() >= region.count()) {
249 done = true;
250 break;
251 }
252 }
253
254 in.progress(flushedAmount);
255 }
256
257 if (done) {
258 in.remove();
259 } else {
260
261 break;
262 }
263 } else {
264
265 throw new Error();
266 }
267 }
268 incompleteWrite(setOpWrite);
269 }
270
271 @Override
272 protected final Object filterOutboundMessage(Object msg) {
273 if (msg instanceof ByteBuf) {
274 ByteBuf buf = (ByteBuf) msg;
275 if (buf.isDirect()) {
276 return msg;
277 }
278
279 return newDirectBuffer(buf);
280 }
281
282 if (msg instanceof FileRegion) {
283 return msg;
284 }
285
286 throw new UnsupportedOperationException(
287 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
288 }
289
290 protected final void incompleteWrite(boolean setOpWrite) {
291
292 if (setOpWrite) {
293 setOpWrite();
294 } else {
295
296 Runnable flushTask = this.flushTask;
297 if (flushTask == null) {
298 flushTask = this.flushTask = new Runnable() {
299 @Override
300 public void run() {
301 flush();
302 }
303 };
304 }
305 eventLoop().execute(flushTask);
306 }
307 }
308
309
310
311
312
313
314
315 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
316
317
318
319
320 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
321
322
323
324
325
326
327 protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
328
329 protected final void setOpWrite() {
330 final SelectionKey key = selectionKey();
331
332
333
334 if (!key.isValid()) {
335 return;
336 }
337 final int interestOps = key.interestOps();
338 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
339 key.interestOps(interestOps | SelectionKey.OP_WRITE);
340 }
341 }
342
343 protected final void clearOpWrite() {
344 final SelectionKey key = selectionKey();
345
346
347
348 if (!key.isValid()) {
349 return;
350 }
351 final int interestOps = key.interestOps();
352 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
353 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
354 }
355 }
356 }