1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.RecvByteBufAllocator;
23 import io.netty.channel.ServerChannel;
24
25 import java.io.IOException;
26 import java.net.PortUnreachableException;
27 import java.nio.channels.SelectableChannel;
28 import java.nio.channels.SelectionKey;
29 import java.util.ArrayList;
30 import java.util.List;
31
32
33
34
35 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
36 boolean inputShutdown;
37
38
39
40
41 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
42 super(parent, ch, readInterestOp);
43 }
44
45 @Override
46 protected AbstractNioUnsafe newUnsafe() {
47 return new NioMessageUnsafe();
48 }
49
50 @Override
51 protected void doBeginRead() throws Exception {
52 if (inputShutdown) {
53 return;
54 }
55 super.doBeginRead();
56 }
57
58 protected boolean continueReading(RecvByteBufAllocator.Handle allocHandle) {
59 return allocHandle.continueReading();
60 }
61
62 private final class NioMessageUnsafe extends AbstractNioUnsafe {
63
64 private final List<Object> readBuf = new ArrayList<Object>();
65
66 @Override
67 public void read() {
68 assert eventLoop().inEventLoop();
69 final ChannelConfig config = config();
70 final ChannelPipeline pipeline = pipeline();
71 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
72 allocHandle.reset(config);
73
74 boolean closed = false;
75 Throwable exception = null;
76 try {
77 try {
78 do {
79 int localRead = doReadMessages(readBuf);
80 if (localRead == 0) {
81 break;
82 }
83 if (localRead < 0) {
84 closed = true;
85 break;
86 }
87
88 allocHandle.incMessagesRead(localRead);
89 } while (continueReading(allocHandle));
90 } catch (Throwable t) {
91 exception = t;
92 }
93
94 int size = readBuf.size();
95 for (int i = 0; i < size; i ++) {
96 readPending = false;
97 pipeline.fireChannelRead(readBuf.get(i));
98 }
99 readBuf.clear();
100 allocHandle.readComplete();
101 pipeline.fireChannelReadComplete();
102
103 if (exception != null) {
104 closed = closeOnReadError(exception);
105
106 pipeline.fireExceptionCaught(exception);
107 }
108
109 if (closed) {
110 inputShutdown = true;
111 if (isOpen()) {
112 close(voidPromise());
113 }
114 }
115 } finally {
116
117
118
119
120
121
122 if (!readPending && !config.isAutoRead()) {
123 removeReadOp();
124 }
125 }
126 }
127 }
128
129 @Override
130 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
131 final SelectionKey key = selectionKey();
132 final int interestOps = key.interestOps();
133
134 int maxMessagesPerWrite = maxMessagesPerWrite();
135 while (maxMessagesPerWrite > 0) {
136 Object msg = in.current();
137 if (msg == null) {
138 break;
139 }
140 try {
141 boolean done = false;
142 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
143 if (doWriteMessage(msg, in)) {
144 done = true;
145 break;
146 }
147 }
148
149 if (done) {
150 maxMessagesPerWrite--;
151 in.remove();
152 } else {
153 break;
154 }
155 } catch (Exception e) {
156 if (continueOnWriteError()) {
157 maxMessagesPerWrite--;
158 in.remove(e);
159 } else {
160 throw e;
161 }
162 }
163 }
164 if (in.isEmpty()) {
165
166 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
167 key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
168 }
169 } else {
170
171 if ((interestOps & SelectionKey.OP_WRITE) == 0) {
172 key.interestOps(interestOps | SelectionKey.OP_WRITE);
173 }
174 }
175 }
176
177
178
179
180 protected boolean continueOnWriteError() {
181 return false;
182 }
183
184 protected boolean closeOnReadError(Throwable cause) {
185 if (!isActive()) {
186
187 return true;
188 }
189 if (cause instanceof PortUnreachableException) {
190 return false;
191 }
192 if (cause instanceof IOException) {
193
194
195 return !(this instanceof ServerChannel);
196 }
197 return true;
198 }
199
200
201
202
203 protected abstract int doReadMessages(List<Object> buf) throws Exception;
204
205
206
207
208
209
210 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
211 }