查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.AbstractChannel;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelFactory;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelSink;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
26  import org.jboss.netty.util.internal.ThreadLocalBoolean;
27  
28  import java.net.InetSocketAddress;
29  import java.nio.channels.SelectableChannel;
30  import java.nio.channels.WritableByteChannel;
31  import java.util.Collection;
32  import java.util.Iterator;
33  import java.util.Queue;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import static org.jboss.netty.channel.Channels.*;
39  import static org.jboss.netty.channel.socket.nio.AbstractNioWorker.isIoThread;
40  
41  abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChannel> extends AbstractChannel {
42  
43      /**
44       * The {@link AbstractNioWorker}.
45       */
46      final AbstractNioWorker worker;
47  
48      /**
49       * Monitor object for synchronizing access to the {@link WriteRequestQueue}.
50       */
51      final Object writeLock = new Object();
52  
53      /**
54       * WriteTask that performs write operations.
55       */
56      final Runnable writeTask = new WriteTask();
57  
58      /**
59       * Indicates if there is a {@link WriteTask} in the task queue.
60       */
61      final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
62  
63      /**
64       * Queue of write {@link MessageEvent}s.
65       */
66      final Queue<MessageEvent> writeBufferQueue = new WriteRequestQueue();
67  
68      /**
69       * Keeps track of the number of bytes that the {@link WriteRequestQueue} currently
70       * contains.
71       */
72      final AtomicInteger writeBufferSize = new AtomicInteger();
73  
74      /**
75       * Keeps track of the highWaterMark.
76       */
77      final AtomicInteger highWaterMarkCounter = new AtomicInteger();
78  
79      /**
80       * The current write {@link MessageEvent}
81       */
82      MessageEvent currentWriteEvent;
83      SendBuffer currentWriteBuffer;
84  
85      /**
86       * Boolean that indicates that write operation is in progress.
87       */
88      boolean inWriteNowLoop;
89      boolean writeSuspended;
90  
91      private volatile InetSocketAddress localAddress;
92      volatile InetSocketAddress remoteAddress;
93  
94      final C channel;
95  
96      protected AbstractNioChannel(
97              Integer id, Channel parent, ChannelFactory factory, ChannelPipeline pipeline,
98              ChannelSink sink, AbstractNioWorker worker, C ch) {
99          super(id, parent, factory, pipeline, sink);
100         this.worker = worker;
101         channel = ch;
102     }
103 
104     protected AbstractNioChannel(
105             Channel parent, ChannelFactory factory,
106             ChannelPipeline pipeline, ChannelSink sink, AbstractNioWorker worker, C ch)  {
107         super(parent, factory, pipeline, sink);
108         this.worker = worker;
109         channel = ch;
110     }
111 
112     /**
113      * Return the {@link AbstractNioWorker} that handle the IO of the
114      * {@link AbstractNioChannel}
115      *
116      * @return worker
117      */
118     public AbstractNioWorker getWorker() {
119         return worker;
120     }
121 
122     public InetSocketAddress getLocalAddress() {
123         InetSocketAddress localAddress = this.localAddress;
124         if (localAddress == null) {
125             try {
126                 localAddress = getLocalSocketAddress();
127                 if (localAddress.getAddress().isAnyLocalAddress()) {
128                     // Don't cache on a wildcard address so the correct one
129                     // will be cached once the channel is connected/bound
130                     return localAddress;
131                 }
132                 this.localAddress = localAddress;
133             } catch (Throwable t) {
134                 // Sometimes fails on a closed socket in Windows.
135                 return null;
136             }
137         }
138         return localAddress;
139     }
140 
141     public InetSocketAddress getRemoteAddress() {
142         InetSocketAddress remoteAddress = this.remoteAddress;
143         if (remoteAddress == null) {
144             try {
145                 this.remoteAddress = remoteAddress =
146                     getRemoteSocketAddress();
147             } catch (Throwable t) {
148                 // Sometimes fails on a closed socket in Windows.
149                 return null;
150             }
151         }
152         return remoteAddress;
153     }
154 
155     public abstract NioChannelConfig getConfig();
156 
157     @Override
158     protected int getInternalInterestOps() {
159         return super.getInternalInterestOps();
160     }
161 
162     @Override
163     protected void setInternalInterestOps(int interestOps) {
164         super.setInternalInterestOps(interestOps);
165     }
166 
167     @Override
168     protected boolean setClosed() {
169         return super.setClosed();
170     }
171 
172     abstract InetSocketAddress getLocalSocketAddress() throws Exception;
173 
174     abstract InetSocketAddress getRemoteSocketAddress() throws Exception;
175 
176     private final class WriteRequestQueue implements Queue<MessageEvent> {
177         private final ThreadLocalBoolean notifying = new ThreadLocalBoolean();
178 
179         private final Queue<MessageEvent> queue;
180 
181         public WriteRequestQueue() {
182             queue = new ConcurrentLinkedQueue<MessageEvent>();
183         }
184 
185         public MessageEvent remove() {
186             return queue.remove();
187         }
188 
189         public MessageEvent element() {
190             return queue.element();
191         }
192 
193         public MessageEvent peek() {
194             return queue.peek();
195         }
196 
197         public int size() {
198             return queue.size();
199         }
200 
201         public boolean isEmpty() {
202             return queue.isEmpty();
203         }
204 
205         public Iterator<MessageEvent> iterator() {
206             return queue.iterator();
207         }
208 
209         public Object[] toArray() {
210             return queue.toArray();
211         }
212 
213         public <T> T[] toArray(T[] a) {
214             return queue.toArray(a);
215         }
216 
217         public boolean containsAll(Collection<?> c) {
218             return queue.containsAll(c);
219         }
220 
221         public boolean addAll(Collection<? extends MessageEvent> c) {
222             return queue.addAll(c);
223         }
224 
225         public boolean removeAll(Collection<?> c) {
226             return queue.removeAll(c);
227         }
228 
229         public boolean retainAll(Collection<?> c) {
230             return queue.retainAll(c);
231         }
232 
233         public void clear() {
234             queue.clear();
235         }
236 
237         public boolean add(MessageEvent e) {
238             return queue.add(e);
239         }
240 
241         public boolean remove(Object o) {
242             return queue.remove(o);
243         }
244 
245         public boolean contains(Object o) {
246             return queue.contains(o);
247         }
248 
249         public boolean offer(MessageEvent e) {
250             boolean success = queue.offer(e);
251             assert success;
252 
253             int messageSize = getMessageSize(e);
254             int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
255             int highWaterMark =  getConfig().getWriteBufferHighWaterMark();
256 
257             if (newWriteBufferSize >= highWaterMark) {
258                 if (newWriteBufferSize - messageSize < highWaterMark) {
259                     highWaterMarkCounter.incrementAndGet();
260                     if (setUnwritable()) {
261                         if (!isIoThread(AbstractNioChannel.this)) {
262                             fireChannelInterestChangedLater(AbstractNioChannel.this);
263                         } else if (!notifying.get()) {
264                             notifying.set(Boolean.TRUE);
265                             fireChannelInterestChanged(AbstractNioChannel.this);
266                             notifying.set(Boolean.FALSE);
267                         }
268                     }
269                 }
270             }
271             return true;
272         }
273 
274         public MessageEvent poll() {
275             MessageEvent e = queue.poll();
276             if (e != null) {
277                 int messageSize = getMessageSize(e);
278                 int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
279                 int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
280 
281                 if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
282                     if (newWriteBufferSize + messageSize >= lowWaterMark) {
283                         highWaterMarkCounter.decrementAndGet();
284                         if (isConnected() && setWritable()) {
285                             if (!isIoThread(AbstractNioChannel.this)) {
286                                fireChannelInterestChangedLater(AbstractNioChannel.this);
287                             } else if (!notifying.get()) {
288                                 notifying.set(Boolean.TRUE);
289                                 fireChannelInterestChanged(AbstractNioChannel.this);
290                                 notifying.set(Boolean.FALSE);
291                             }
292                         }
293                     }
294                 }
295             }
296             return e;
297         }
298 
299         private int getMessageSize(MessageEvent e) {
300             Object m = e.getMessage();
301             if (m instanceof ChannelBuffer) {
302                 return ((ChannelBuffer) m).readableBytes();
303             }
304             return 0;
305         }
306     }
307 
308     private final class WriteTask implements Runnable {
309 
310         WriteTask() {
311         }
312 
313         public void run() {
314             writeTaskInTaskQueue.set(false);
315             worker.writeFromTaskLoop(AbstractNioChannel.this);
316         }
317     }
318 
319 }