1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.oio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOption;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.FileRegion;
28 import io.netty.channel.RecvByteBufAllocator;
29 import io.netty.channel.socket.ChannelInputShutdownEvent;
30 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
31 import io.netty.util.internal.StringUtil;
32
33 import java.io.IOException;
34
35
36
37
38
39
40 public abstract class AbstractOioByteChannel extends AbstractOioChannel {
41
42 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
43 private static final String EXPECTED_TYPES =
44 " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
45 StringUtil.simpleClassName(FileRegion.class) + ')';
46
47
48
49
50 protected AbstractOioByteChannel(Channel parent) {
51 super(parent);
52 }
53
54 @Override
55 public ChannelMetadata metadata() {
56 return METADATA;
57 }
58
59
60
61
62
63 protected abstract boolean isInputShutdown();
64
65
66
67
68
69 protected abstract ChannelFuture shutdownInput();
70
71 private void closeOnRead(ChannelPipeline pipeline) {
72 if (isOpen()) {
73 if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
74 shutdownInput();
75 pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
76 } else {
77 unsafe().close(unsafe().voidPromise());
78 }
79 pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
80 }
81 }
82
83 private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
84 RecvByteBufAllocator.Handle allocHandle) {
85 if (byteBuf != null) {
86 if (byteBuf.isReadable()) {
87 readPending = false;
88 pipeline.fireChannelRead(byteBuf);
89 } else {
90 byteBuf.release();
91 }
92 }
93 allocHandle.readComplete();
94 pipeline.fireChannelReadComplete();
95 pipeline.fireExceptionCaught(cause);
96
97
98
99 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
100 closeOnRead(pipeline);
101 }
102 }
103
104 @Override
105 protected void doRead() {
106 final ChannelConfig config = config();
107 if (isInputShutdown() || !readPending) {
108
109
110 return;
111 }
112
113
114 readPending = false;
115
116 final ChannelPipeline pipeline = pipeline();
117 final ByteBufAllocator allocator = config.getAllocator();
118 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
119 allocHandle.reset(config);
120
121 ByteBuf byteBuf = null;
122 boolean close = false;
123 boolean readData = false;
124 try {
125 byteBuf = allocHandle.allocate(allocator);
126 do {
127 allocHandle.lastBytesRead(doReadBytes(byteBuf));
128 if (allocHandle.lastBytesRead() <= 0) {
129 if (!byteBuf.isReadable()) {
130 byteBuf.release();
131 byteBuf = null;
132 close = allocHandle.lastBytesRead() < 0;
133 if (close) {
134
135 readPending = false;
136 }
137 }
138 break;
139 } else {
140 readData = true;
141 }
142
143 final int available = available();
144 if (available <= 0) {
145 break;
146 }
147
148
149 if (!byteBuf.isWritable()) {
150 final int capacity = byteBuf.capacity();
151 final int maxCapacity = byteBuf.maxCapacity();
152 if (capacity == maxCapacity) {
153 allocHandle.incMessagesRead(1);
154 readPending = false;
155 pipeline.fireChannelRead(byteBuf);
156 byteBuf = allocHandle.allocate(allocator);
157 } else {
158 final int writerIndex = byteBuf.writerIndex();
159 if (writerIndex + available > maxCapacity) {
160 byteBuf.capacity(maxCapacity);
161 } else {
162 byteBuf.ensureWritable(available);
163 }
164 }
165 }
166 } while (allocHandle.continueReading());
167
168 if (byteBuf != null) {
169
170
171 if (byteBuf.isReadable()) {
172 readPending = false;
173 pipeline.fireChannelRead(byteBuf);
174 } else {
175 byteBuf.release();
176 }
177 byteBuf = null;
178 }
179
180 if (readData) {
181 allocHandle.readComplete();
182 pipeline.fireChannelReadComplete();
183 }
184
185 if (close) {
186 closeOnRead(pipeline);
187 }
188 } catch (Throwable t) {
189 handleReadException(pipeline, byteBuf, t, close, allocHandle);
190 } finally {
191 if (readPending || config.isAutoRead() || !readData && isActive()) {
192
193
194 read();
195 }
196 }
197 }
198
199 @Override
200 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
201 for (;;) {
202 Object msg = in.current();
203 if (msg == null) {
204
205 break;
206 }
207 if (msg instanceof ByteBuf) {
208 ByteBuf buf = (ByteBuf) msg;
209 int readableBytes = buf.readableBytes();
210 while (readableBytes > 0) {
211 doWriteBytes(buf);
212 int newReadableBytes = buf.readableBytes();
213 in.progress(readableBytes - newReadableBytes);
214 readableBytes = newReadableBytes;
215 }
216 in.remove();
217 } else if (msg instanceof FileRegion) {
218 FileRegion region = (FileRegion) msg;
219 long transferred = region.transferred();
220 doWriteFileRegion(region);
221 in.progress(region.transferred() - transferred);
222 in.remove();
223 } else {
224 in.remove(new UnsupportedOperationException(
225 "unsupported message type: " + StringUtil.simpleClassName(msg)));
226 }
227 }
228 }
229
230 @Override
231 protected final Object filterOutboundMessage(Object msg) throws Exception {
232 if (msg instanceof ByteBuf || msg instanceof FileRegion) {
233 return msg;
234 }
235
236 throw new UnsupportedOperationException(
237 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
238 }
239
240
241
242
243 protected abstract int available();
244
245
246
247
248
249
250
251
252
253 protected abstract int doReadBytes(ByteBuf buf) throws Exception;
254
255
256
257
258
259
260
261 protected abstract void doWriteBytes(ByteBuf buf) throws Exception;
262
263
264
265
266
267
268
269 protected abstract void doWriteFileRegion(FileRegion region) throws Exception;
270 }