1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.FileRegion;
27 import io.netty.channel.RecvByteBufAllocator;
28 import io.netty.channel.internal.ChannelUtils;
29 import io.netty.channel.socket.ChannelInputShutdownEvent;
30 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31 import io.netty.channel.socket.SocketChannelConfig;
32 import io.netty.util.internal.StringUtil;
33
34 import java.io.IOException;
35 import java.nio.channels.SelectableChannel;
36 import java.nio.channels.SelectionKey;
37
38 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
39
40
41
42
43 public abstract class AbstractNioByteChannel extends AbstractNioChannel {
44 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
45 private static final String EXPECTED_TYPES =
46 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
47 StringUtil.simpleClassName(FileRegion.class) + ')';
48
49 private final Runnable flushTask = new Runnable() {
50 @Override
51 public void run() {
52
53
54 ((AbstractNioUnsafe) unsafe()).flush0();
55 }
56 };
57 private boolean inputClosedSeenErrorOnRead;
58
59
60
61
62
63
64
65 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
66 super(parent, ch, SelectionKey.OP_READ);
67 }
68
69
70
71
72 protected abstract ChannelFuture shutdownInput();
73
74 protected boolean isInputShutdown0() {
75 return false;
76 }
77
78 @Override
79 protected AbstractNioUnsafe newUnsafe() {
80 return new NioByteUnsafe();
81 }
82
83 @Override
84 public ChannelMetadata metadata() {
85 return METADATA;
86 }
87
88 final boolean shouldBreakReadReady(ChannelConfig config) {
89 return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
90 }
91
92 private static boolean isAllowHalfClosure(ChannelConfig config) {
93 return config instanceof SocketChannelConfig &&
94 ((SocketChannelConfig) config).isAllowHalfClosure();
95 }
96
97 protected class NioByteUnsafe extends AbstractNioUnsafe {
98
99 private void closeOnRead(ChannelPipeline pipeline) {
100 if (!isInputShutdown0()) {
101 if (isAllowHalfClosure(config())) {
102 shutdownInput();
103 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
104 } else {
105 close(voidPromise());
106 }
107 } else if (!inputClosedSeenErrorOnRead) {
108 inputClosedSeenErrorOnRead = true;
109 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
110 }
111 }
112
113 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
114 RecvByteBufAllocator.Handle allocHandle) {
115 if (byteBuf != null) {
116 if (byteBuf.isReadable()) {
117 readPending = false;
118 pipeline.fireChannelRead(byteBuf);
119 } else {
120 byteBuf.release();
121 }
122 }
123 allocHandle.readComplete();
124 pipeline.fireChannelReadComplete();
125 pipeline.fireExceptionCaught(cause);
126
127
128
129 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
130 closeOnRead(pipeline);
131 }
132 }
133
134 @Override
135 public final void read() {
136 final ChannelConfig config = config();
137 if (shouldBreakReadReady(config)) {
138 clearReadPending();
139 return;
140 }
141 final ChannelPipeline pipeline = pipeline();
142 final ByteBufAllocator allocator = config.getAllocator();
143 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
144 allocHandle.reset(config);
145
146 ByteBuf byteBuf = null;
147 boolean close = false;
148 try {
149 do {
150 byteBuf = allocHandle.allocate(allocator);
151 allocHandle.lastBytesRead(doReadBytes(byteBuf));
152 if (allocHandle.lastBytesRead() <= 0) {
153
154 byteBuf.release();
155 byteBuf = null;
156 close = allocHandle.lastBytesRead() < 0;
157 if (close) {
158
159 readPending = false;
160 }
161 break;
162 }
163
164 allocHandle.incMessagesRead(1);
165 readPending = false;
166 pipeline.fireChannelRead(byteBuf);
167 byteBuf = null;
168 } while (allocHandle.continueReading());
169
170 allocHandle.readComplete();
171 pipeline.fireChannelReadComplete();
172
173 if (close) {
174 closeOnRead(pipeline);
175 }
176 } catch (Throwable t) {
177 handleReadException(pipeline, byteBuf, t, close, allocHandle);
178 } finally {
179
180
181
182
183
184
185 if (!readPending && !config.isAutoRead()) {
186 removeReadOp();
187 }
188 }
189 }
190 }
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206 protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
207 Object msg = in.current();
208 if (msg == null) {
209
210 return 0;
211 }
212 return doWriteInternal(in, in.current());
213 }
214
215 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
216 if (msg instanceof ByteBuf) {
217 ByteBuf buf = (ByteBuf) msg;
218 if (!buf.isReadable()) {
219 in.remove();
220 return 0;
221 }
222
223 final int localFlushedAmount = doWriteBytes(buf);
224 if (localFlushedAmount > 0) {
225 in.progress(localFlushedAmount);
226 if (!buf.isReadable()) {
227 in.remove();
228 }
229 return 1;
230 }
231 } else if (msg instanceof FileRegion) {
232 FileRegion region = (FileRegion) msg;
233 if (region.transferred() >= region.count()) {
234 in.remove();
235 return 0;
236 }
237
238 long localFlushedAmount = doWriteFileRegion(region);
239 if (localFlushedAmount > 0) {
240 in.progress(localFlushedAmount);
241 if (region.transferred() >= region.count()) {
242 in.remove();
243 }
244 return 1;
245 }
246 } else {
247
248 throw new Error();
249 }
250 return WRITE_STATUS_SNDBUF_FULL;
251 }
252
253 @Override
254 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
255 int writeSpinCount = config().getWriteSpinCount();
256 do {
257 Object msg = in.current();
258 if (msg == null) {
259
260 clearOpWrite();
261
262 return;
263 }
264 writeSpinCount -= doWriteInternal(in, msg);
265 } while (writeSpinCount > 0);
266
267 incompleteWrite(writeSpinCount < 0);
268 }
269
270 @Override
271 protected final Object filterOutboundMessage(Object msg) {
272 if (msg instanceof ByteBuf) {
273 ByteBuf buf = (ByteBuf) msg;
274 if (buf.isDirect()) {
275 return msg;
276 }
277
278 return newDirectBuffer(buf);
279 }
280
281 if (msg instanceof FileRegion) {
282 return msg;
283 }
284
285 throw new UnsupportedOperationException(
286 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
287 }
288
289 protected final void incompleteWrite(boolean setOpWrite) {
290
291 if (setOpWrite) {
292 setOpWrite();
293 } else {
294
295
296
297
298 clearOpWrite();
299
300
301 eventLoop().execute(flushTask);
302 }
303 }
304
305
306
307
308
309
310
311 protected abstract long doWriteFileRegion(FileRegion region) throws Exception;
312
313
314
315
316 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
317
318
319
320
321
322
323 protected abstract int doWriteBytes(ByteBuf buf) throws Exception;
324
325 protected final void setOpWrite() {
326 final SelectionKey key = selectionKey();
327
328
329
330 if (!key.isValid()) {
331 return;
332 }
333 final int interestOps = key.interestOps();
334 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
335 key.interestOps(interestOps | SelectionKey.OP_WRITE);
336 }
337 }
338
339 protected final void clearOpWrite() {
340 final SelectionKey key = selectionKey();
341
342
343
344 if (!key.isValid()) {
345 return;
346 }
347 final int interestOps = key.interestOps();
348 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
349 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
350 }
351 }
352 }