查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.SocketAddress;
23  import java.net.SocketException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.SocketChannel;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  
29  import org.apache.mina.common.IoFilterChain;
30  import org.apache.mina.common.IoHandler;
31  import org.apache.mina.common.IoService;
32  import org.apache.mina.common.IoServiceConfig;
33  import org.apache.mina.common.IoSession;
34  import org.apache.mina.common.IoSessionConfig;
35  import org.apache.mina.common.RuntimeIOException;
36  import org.apache.mina.common.TransportType;
37  import org.apache.mina.common.IoFilter.WriteRequest;
38  import org.apache.mina.common.support.BaseIoSession;
39  import org.apache.mina.common.support.BaseIoSessionConfig;
40  import org.apache.mina.common.support.IoServiceListenerSupport;
41  
42  /**
43   * An {@link IoSession} for socket transport (TCP/IP).
44   *
45   * @author The Apache Directory Project (mina-dev@directory.apache.org)
46   * @version $Rev: 585050 $, $Date: 2007-10-16 14:57:53 +0900 (Tue, 16 Oct 2007) $
47   */
48  class SocketSessionImpl extends BaseIoSession {
49      private final IoService manager;
50  
51      private final IoServiceConfig serviceConfig;
52  
53      private final SocketSessionConfig config = new SessionConfigImpl();
54  
55      private final SocketIoProcessor ioProcessor;
56  
57      private final SocketFilterChain filterChain;
58  
59      private final SocketChannel ch;
60  
61      private final Queue<WriteRequest> writeRequestQueue;
62  
63      private final IoHandler handler;
64  
65      private final SocketAddress remoteAddress;
66  
67      private final SocketAddress localAddress;
68  
69      private final SocketAddress serviceAddress;
70  
71      private final IoServiceListenerSupport serviceListeners;
72  
73      private SelectionKey key;
74  
75      private int readBufferSize = 1024;
76      private boolean deferDecreaseReadBufferSize = true;
77  
78      /**
79       * Creates a new instance.
80       */
81      SocketSessionImpl(IoService manager, SocketIoProcessor ioProcessor,
82              IoServiceListenerSupport listeners, IoServiceConfig serviceConfig,
83              SocketChannel ch, IoHandler defaultHandler,
84              SocketAddress serviceAddress) {
85          this.manager = manager;
86          this.serviceListeners = listeners;
87          this.ioProcessor = ioProcessor;
88          this.filterChain = new SocketFilterChain(this);
89          this.ch = ch;
90          this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
91          this.handler = defaultHandler;
92          this.remoteAddress = ch.socket().getRemoteSocketAddress();
93          this.localAddress = ch.socket().getLocalSocketAddress();
94          this.serviceAddress = serviceAddress;
95          this.serviceConfig = serviceConfig;
96  
97          // Apply the initial session settings
98          IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
99          if (sessionConfig instanceof SocketSessionConfig) {
100             SocketSessionConfig cfg = (SocketSessionConfig) sessionConfig;
101             this.config.setKeepAlive(cfg.isKeepAlive());
102             this.config.setOobInline(cfg.isOobInline());
103             this.config.setReceiveBufferSize(cfg.getReceiveBufferSize());
104             this.config.setReuseAddress(cfg.isReuseAddress());
105             this.config.setSendBufferSize(cfg.getSendBufferSize());
106             this.config.setSoLinger(cfg.getSoLinger());
107             this.config.setTcpNoDelay(cfg.isTcpNoDelay());
108 
109             if (this.config.getTrafficClass() != cfg.getTrafficClass()) {
110                 this.config.setTrafficClass(cfg.getTrafficClass());
111             }
112         }
113     }
114 
115     public IoService getService() {
116         return manager;
117     }
118 
119     public IoServiceConfig getServiceConfig() {
120         return serviceConfig;
121     }
122 
123     public IoSessionConfig getConfig() {
124         return config;
125     }
126 
127     SocketIoProcessor getIoProcessor() {
128         return ioProcessor;
129     }
130 
131     public IoFilterChain getFilterChain() {
132         return filterChain;
133     }
134 
135     SocketChannel getChannel() {
136         return ch;
137     }
138 
139     IoServiceListenerSupport getServiceListeners() {
140         return serviceListeners;
141     }
142 
143     SelectionKey getSelectionKey() {
144         return key;
145     }
146 
147     void setSelectionKey(SelectionKey key) {
148         this.key = key;
149     }
150 
151     public IoHandler getHandler() {
152         return handler;
153     }
154 
155     @Override
156     protected void close0() {
157         filterChain.fireFilterClose(this);
158     }
159 
160     Queue<WriteRequest> getWriteRequestQueue() {
161         return writeRequestQueue;
162     }
163 
164     @Override
165     protected void write0(WriteRequest writeRequest) {
166         filterChain.fireFilterWrite(this, writeRequest);
167     }
168 
169     public TransportType getTransportType() {
170         return TransportType.SOCKET;
171     }
172 
173     public SocketAddress getRemoteAddress() {
174         return remoteAddress;
175     }
176 
177     public SocketAddress getLocalAddress() {
178         return localAddress;
179     }
180 
181     public SocketAddress getServiceAddress() {
182         return serviceAddress;
183     }
184 
185     @Override
186     protected void updateTrafficMask() {
187         this.ioProcessor.updateTrafficMask(this);
188     }
189 
190     int getReadBufferSize() {
191         return readBufferSize;
192     }
193     
194     void increaseReadBufferSize() {
195         int newReadBufferSize = getReadBufferSize() << 1;
196         if (newReadBufferSize <= ((SocketSessionConfig) getConfig()).getReceiveBufferSize() << 1) {
197             // read buffer size shouldn't get bigger than
198             // twice of the receive buffer size because of
199             // read-write fairness.
200             setReadBufferSize(newReadBufferSize);
201         }
202     }
203     
204     void decreaseReadBufferSize() {
205         if (deferDecreaseReadBufferSize) {
206             deferDecreaseReadBufferSize = false;
207             return;
208         }
209         
210         if (getReadBufferSize() > 64) {
211             setReadBufferSize(getReadBufferSize() >>> 1);
212         }
213     }
214     
215     private void setReadBufferSize(int readBufferSize) {
216         this.readBufferSize = readBufferSize;
217         this.deferDecreaseReadBufferSize = true;
218     }
219 
220     private class SessionConfigImpl extends BaseIoSessionConfig implements
221             SocketSessionConfig {
222         public boolean isKeepAlive() {
223             try {
224                 return ch.socket().getKeepAlive();
225             } catch (SocketException e) {
226                 throw new RuntimeIOException(e);
227             }
228         }
229 
230         public void setKeepAlive(boolean on) {
231             try {
232                 ch.socket().setKeepAlive(on);
233             } catch (SocketException e) {
234                 throw new RuntimeIOException(e);
235             }
236         }
237 
238         public boolean isOobInline() {
239             try {
240                 return ch.socket().getOOBInline();
241             } catch (SocketException e) {
242                 throw new RuntimeIOException(e);
243             }
244         }
245 
246         public void setOobInline(boolean on) {
247             try {
248                 ch.socket().setOOBInline(on);
249             } catch (SocketException e) {
250                 throw new RuntimeIOException(e);
251             }
252         }
253 
254         public boolean isReuseAddress() {
255             try {
256                 return ch.socket().getReuseAddress();
257             } catch (SocketException e) {
258                 throw new RuntimeIOException(e);
259             }
260         }
261 
262         public void setReuseAddress(boolean on) {
263             try {
264                 ch.socket().setReuseAddress(on);
265             } catch (SocketException e) {
266                 throw new RuntimeIOException(e);
267             }
268         }
269 
270         public int getSoLinger() {
271             try {
272                 return ch.socket().getSoLinger();
273             } catch (SocketException e) {
274                 throw new RuntimeIOException(e);
275             }
276         }
277 
278         public void setSoLinger(int linger) {
279             try {
280                 if (linger < 0) {
281                     ch.socket().setSoLinger(false, 0);
282                 } else {
283                     ch.socket().setSoLinger(true, linger);
284                 }
285             } catch (SocketException e) {
286                 throw new RuntimeIOException(e);
287             }
288         }
289 
290         public boolean isTcpNoDelay() {
291             try {
292                 return ch.socket().getTcpNoDelay();
293             } catch (SocketException e) {
294                 throw new RuntimeIOException(e);
295             }
296         }
297 
298         public void setTcpNoDelay(boolean on) {
299             try {
300                 ch.socket().setTcpNoDelay(on);
301             } catch (SocketException e) {
302                 throw new RuntimeIOException(e);
303             }
304         }
305 
306         public int getTrafficClass() {
307             if (SocketSessionConfigImpl.isGetTrafficClassAvailable()) {
308                 try {
309                     return ch.socket().getTrafficClass();
310                 } catch (SocketException e) {
311                     // Throw an exception only when setTrafficClass is also available.
312                     if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
313                         throw new RuntimeIOException(e);
314                     }
315                 }
316             }
317 
318             return 0;
319         }
320 
321         public void setTrafficClass(int tc) {
322             if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
323                 try {
324                     ch.socket().setTrafficClass(tc);
325                 } catch (SocketException e) {
326                     throw new RuntimeIOException(e);
327                 }
328             }
329         }
330 
331         public int getSendBufferSize() {
332             try {
333                 return ch.socket().getSendBufferSize();
334             } catch (SocketException e) {
335                 throw new RuntimeIOException(e);
336             }
337         }
338 
339         public void setSendBufferSize(int size) {
340             if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable()) {
341                 try {
342                     ch.socket().setSendBufferSize(size);
343                 } catch (SocketException e) {
344                     throw new RuntimeIOException(e);
345                 }
346             }
347         }
348 
349         public int getReceiveBufferSize() {
350             try {
351                 return ch.socket().getReceiveBufferSize();
352             } catch (SocketException e) {
353                 throw new RuntimeIOException(e);
354             }
355         }
356 
357         public void setReceiveBufferSize(int size) {
358             if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable()) {
359                 try {
360                     ch.socket().setReceiveBufferSize(size);
361                 } catch (SocketException e) {
362                     throw new RuntimeIOException(e);
363                 }
364             }
365         }
366     }
367 }