1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.AbstractChannel;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelFactory;
22 import org.jboss.netty.channel.ChannelPipeline;
23 import org.jboss.netty.channel.ChannelSink;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26 import org.jboss.netty.util.internal.ThreadLocalBoolean;
27
28 import java.net.InetSocketAddress;
29 import java.nio.channels.SelectableChannel;
30 import java.nio.channels.WritableByteChannel;
31 import java.util.Collection;
32 import java.util.Iterator;
33 import java.util.Queue;
34 import java.util.concurrent.ConcurrentLinkedQueue;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import static org.jboss.netty.channel.Channels.*;
39 import static org.jboss.netty.channel.socket.nio.AbstractNioWorker.isIoThread;
40
41 abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
42
43
44
45
46 final AbstractNioWorker worker;
47
48
49
50
51 final Object writeLock = new Object();
52
53
54
55
56 final Runnable writeTask = new WriteTask();
57
58
59
60
61 final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
62
63
64
65
66 final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
67
68
69
70
71
72 final AtomicInteger writeBufferSize = new AtomicInteger();
73
74
75
76
77 final AtomicInteger highWaterMarkCounter = new AtomicInteger();
78
79
80
81
82 MessageEvent currentWriteEvent;
83 SendBuffer currentWriteBuffer;
84
85
86
87
88 boolean inWriteNowLoop;
89 boolean writeSuspended;
90
91 private volatile InetSocketAddress localAddress;
92 volatile InetSocketAddress remoteAddress;
93
94 final C channel;
95
96 protected AbstractNioChannel(
97 Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
98 ChannelSink sink, AbstractNioWorker worker, C ch) {
99 super(id, parent, factory, pipeline, sink);
100 this.worker = worker;
101 channel = ch;
102 }
103
104 protected AbstractNioChannel(
105 Channel parent, ChannelFactory factory,
106 ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch) {
107 super(parent, factory, pipeline, sink);
108 this.worker = worker;
109 channel = ch;
110 }
111
112
113
114
115
116
117
118 public AbstractNioWorker getWorker() {
119 return worker;
120 }
121
122 public InetSocketAddress getLocalAddress() {
123 InetSocketAddress localAddress = this.localAddress;
124 if (localAddress == null) {
125 try {
126 localAddress = getLocalSocketAddress();
127 if (localAddress.getAddress().isAnyLocalAddress()) {
128
129
130 return localAddress;
131 }
132 this.localAddress = localAddress;
133 } catch (Throwable t) {
134
135 return null;
136 }
137 }
138 return localAddress;
139 }
140
141 public InetSocketAddress getRemoteAddress() {
142 InetSocketAddress remoteAddress = this.remoteAddress;
143 if (remoteAddress == null) {
144 try {
145 this.remoteAddress = remoteAddress =
146 getRemoteSocketAddress();
147 } catch (Throwable t) {
148
149 return null;
150 }
151 }
152 return remoteAddress;
153 }
154
155 public abstract NioChannelConfig getConfig();
156
157 @Override
158 protected int getInternalInterestOps() {
159 return super.getInternalInterestOps();
160 }
161
162 @Override
163 protected void setInternalInterestOps(int interestOps) {
164 super.setInternalInterestOps(interestOps);
165 }
166
167 @Override
168 protected boolean setClosed() {
169 return super.setClosed();
170 }
171
172 abstract InetSocketAddress getLocalSocketAddress() throws Exception;
173
174 abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
175
176 private final class WriteRequestQueue implements Queue<MessageEvent> {
177 private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
178
179 private final Queue<MessageEvent> queue;
180
181 public WriteRequestQueue() {
182 queue = new ConcurrentLinkedQueue<MessageEvent>();
183 }
184
185 public MessageEvent remove() {
186 return queue.remove();
187 }
188
189 public MessageEvent element() {
190 return queue.element();
191 }
192
193 public MessageEvent peek() {
194 return queue.peek();
195 }
196
197 public int size() {
198 return queue.size();
199 }
200
201 public boolean isEmpty() {
202 return queue.isEmpty();
203 }
204
205 public Iterator<MessageEvent> iterator() {
206 return queue.iterator();
207 }
208
209 public Object[] toArray() {
210 return queue.toArray();
211 }
212
213 public <T> T[] toArray(T[] a) {
214 return queue.toArray(a);
215 }
216
217 public boolean containsAll(Collection<?> c) {
218 return queue.containsAll(c);
219 }
220
221 public boolean addAll(Collection<? extends MessageEvent> c) {
222 return queue.addAll(c);
223 }
224
225 public boolean removeAll(Collection<?> c) {
226 return queue.removeAll(c);
227 }
228
229 public boolean retainAll(Collection<?> c) {
230 return queue.retainAll(c);
231 }
232
233 public void clear() {
234 queue.clear();
235 }
236
237 public boolean add(MessageEvent e) {
238 return queue.add(e);
239 }
240
241 public boolean remove(Object o) {
242 return queue.remove(o);
243 }
244
245 public boolean contains(Object o) {
246 return queue.contains(o);
247 }
248
249 public boolean offer(MessageEvent e) {
250 boolean success = queue.offer(e);
251 assert success;
252
253 int messageSize = getMessageSize(e);
254 int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
255 int highWaterMark = getConfig().getWriteBufferHighWaterMark();
256
257 if (newWriteBufferSize >= highWaterMark) {
258 if (newWriteBufferSize - messageSize < highWaterMark) {
259 highWaterMarkCounter.incrementAndGet();
260 if (setUnwritable()) {
261 if (!isIoThread(AbstractNioChannel.this)) {
262 fireChannelInterestChangedLater(AbstractNioChannel.this);
263 } else if (!notifying.get()) {
264 notifying.set(Boolean.TRUE);
265 fireChannelInterestChanged(AbstractNioChannel.this);
266 notifying.set(Boolean.FALSE);
267 }
268 }
269 }
270 }
271 return true;
272 }
273
274 public MessageEvent poll() {
275 MessageEvent e = queue.poll();
276 if (e != null) {
277 int messageSize = getMessageSize(e);
278 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
279 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
280
281 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
282 if (newWriteBufferSize + messageSize >= lowWaterMark) {
283 highWaterMarkCounter.decrementAndGet();
284 if (isConnected() && setWritable()) {
285 if (!isIoThread(AbstractNioChannel.this)) {
286 fireChannelInterestChangedLater(AbstractNioChannel.this);
287 } else if (!notifying.get()) {
288 notifying.set(Boolean.TRUE);
289 fireChannelInterestChanged(AbstractNioChannel.this);
290 notifying.set(Boolean.FALSE);
291 }
292 }
293 }
294 }
295 }
296 return e;
297 }
298
299 private int getMessageSize(MessageEvent e) {
300 Object m = e.getMessage();
301 if (m instanceof ChannelBuffer) {
302 return ((ChannelBuffer) m).readableBytes();
303 }
304 return 0;
305 }
306 }
307
308 private final class WriteTask implements Runnable {
309
310 WriteTask() {
311 }
312
313 public void run() {
314 writeTaskInTaskQueue.set(false);
315 worker.writeFromTaskLoop(AbstractNioChannel.this);
316 }
317 }
318
319 }