1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.net.SocketAddress;
23 import java.net.SocketException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.SocketChannel;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28
29 import org.apache.mina.common.IoFilterChain;
30 import org.apache.mina.common.IoHandler;
31 import org.apache.mina.common.IoService;
32 import org.apache.mina.common.IoServiceConfig;
33 import org.apache.mina.common.IoSession;
34 import org.apache.mina.common.IoSessionConfig;
35 import org.apache.mina.common.RuntimeIOException;
36 import org.apache.mina.common.TransportType;
37 import org.apache.mina.common.IoFilter.WriteRequest;
38 import org.apache.mina.common.support.BaseIoSession;
39 import org.apache.mina.common.support.BaseIoSessionConfig;
40 import org.apache.mina.common.support.IoServiceListenerSupport;
41
42
43
44
45
46
47
48 class SocketSessionImpl extends BaseIoSession {
49 private final IoService manager;
50
51 private final IoServiceConfig serviceConfig;
52
53 private final SocketSessionConfig config = new SessionConfigImpl();
54
55 private final SocketIoProcessor ioProcessor;
56
57 private final SocketFilterChain filterChain;
58
59 private final SocketChannel ch;
60
61 private final Queue<WriteRequest> writeRequestQueue;
62
63 private final IoHandler handler;
64
65 private final SocketAddress remoteAddress;
66
67 private final SocketAddress localAddress;
68
69 private final SocketAddress serviceAddress;
70
71 private final IoServiceListenerSupport serviceListeners;
72
73 private SelectionKey key;
74
75 private int readBufferSize = 1024;
76 private boolean deferDecreaseReadBufferSize = true;
77
78
79
80
81 SocketSessionImpl(IoService manager, SocketIoProcessor ioProcessor,
82 IoServiceListenerSupport listeners, IoServiceConfig serviceConfig,
83 SocketChannel ch, IoHandler defaultHandler,
84 SocketAddress serviceAddress) {
85 this.manager = manager;
86 this.serviceListeners = listeners;
87 this.ioProcessor = ioProcessor;
88 this.filterChain = new SocketFilterChain(this);
89 this.ch = ch;
90 this.writeRequestQueue = new ConcurrentLinkedQueue<WriteRequest>();
91 this.handler = defaultHandler;
92 this.remoteAddress = ch.socket().getRemoteSocketAddress();
93 this.localAddress = ch.socket().getLocalSocketAddress();
94 this.serviceAddress = serviceAddress;
95 this.serviceConfig = serviceConfig;
96
97
98 IoSessionConfig sessionConfig = serviceConfig.getSessionConfig();
99 if (sessionConfig instanceof SocketSessionConfig) {
100 SocketSessionConfig cfg = (SocketSessionConfig) sessionConfig;
101 this.config.setKeepAlive(cfg.isKeepAlive());
102 this.config.setOobInline(cfg.isOobInline());
103 this.config.setReceiveBufferSize(cfg.getReceiveBufferSize());
104 this.config.setReuseAddress(cfg.isReuseAddress());
105 this.config.setSendBufferSize(cfg.getSendBufferSize());
106 this.config.setSoLinger(cfg.getSoLinger());
107 this.config.setTcpNoDelay(cfg.isTcpNoDelay());
108
109 if (this.config.getTrafficClass() != cfg.getTrafficClass()) {
110 this.config.setTrafficClass(cfg.getTrafficClass());
111 }
112 }
113 }
114
115 public IoService getService() {
116 return manager;
117 }
118
119 public IoServiceConfig getServiceConfig() {
120 return serviceConfig;
121 }
122
123 public IoSessionConfig getConfig() {
124 return config;
125 }
126
127 SocketIoProcessor getIoProcessor() {
128 return ioProcessor;
129 }
130
131 public IoFilterChain getFilterChain() {
132 return filterChain;
133 }
134
135 SocketChannel getChannel() {
136 return ch;
137 }
138
139 IoServiceListenerSupport getServiceListeners() {
140 return serviceListeners;
141 }
142
143 SelectionKey getSelectionKey() {
144 return key;
145 }
146
147 void setSelectionKey(SelectionKey key) {
148 this.key = key;
149 }
150
151 public IoHandler getHandler() {
152 return handler;
153 }
154
155 @Override
156 protected void close0() {
157 filterChain.fireFilterClose(this);
158 }
159
160 Queue<WriteRequest> getWriteRequestQueue() {
161 return writeRequestQueue;
162 }
163
164 @Override
165 protected void write0(WriteRequest writeRequest) {
166 filterChain.fireFilterWrite(this, writeRequest);
167 }
168
169 public TransportType getTransportType() {
170 return TransportType.SOCKET;
171 }
172
173 public SocketAddress getRemoteAddress() {
174 return remoteAddress;
175 }
176
177 public SocketAddress getLocalAddress() {
178 return localAddress;
179 }
180
181 public SocketAddress getServiceAddress() {
182 return serviceAddress;
183 }
184
185 @Override
186 protected void updateTrafficMask() {
187 this.ioProcessor.updateTrafficMask(this);
188 }
189
190 int getReadBufferSize() {
191 return readBufferSize;
192 }
193
194 void increaseReadBufferSize() {
195 int newReadBufferSize = getReadBufferSize() << 1;
196 if (newReadBufferSize <= ((SocketSessionConfig) getConfig()).getReceiveBufferSize() << 1) {
197
198
199
200 setReadBufferSize(newReadBufferSize);
201 }
202 }
203
204 void decreaseReadBufferSize() {
205 if (deferDecreaseReadBufferSize) {
206 deferDecreaseReadBufferSize = false;
207 return;
208 }
209
210 if (getReadBufferSize() > 64) {
211 setReadBufferSize(getReadBufferSize() >>> 1);
212 }
213 }
214
215 private void setReadBufferSize(int readBufferSize) {
216 this.readBufferSize = readBufferSize;
217 this.deferDecreaseReadBufferSize = true;
218 }
219
220 private class SessionConfigImpl extends BaseIoSessionConfig implements
221 SocketSessionConfig {
222 public boolean isKeepAlive() {
223 try {
224 return ch.socket().getKeepAlive();
225 } catch (SocketException e) {
226 throw new RuntimeIOException(e);
227 }
228 }
229
230 public void setKeepAlive(boolean on) {
231 try {
232 ch.socket().setKeepAlive(on);
233 } catch (SocketException e) {
234 throw new RuntimeIOException(e);
235 }
236 }
237
238 public boolean isOobInline() {
239 try {
240 return ch.socket().getOOBInline();
241 } catch (SocketException e) {
242 throw new RuntimeIOException(e);
243 }
244 }
245
246 public void setOobInline(boolean on) {
247 try {
248 ch.socket().setOOBInline(on);
249 } catch (SocketException e) {
250 throw new RuntimeIOException(e);
251 }
252 }
253
254 public boolean isReuseAddress() {
255 try {
256 return ch.socket().getReuseAddress();
257 } catch (SocketException e) {
258 throw new RuntimeIOException(e);
259 }
260 }
261
262 public void setReuseAddress(boolean on) {
263 try {
264 ch.socket().setReuseAddress(on);
265 } catch (SocketException e) {
266 throw new RuntimeIOException(e);
267 }
268 }
269
270 public int getSoLinger() {
271 try {
272 return ch.socket().getSoLinger();
273 } catch (SocketException e) {
274 throw new RuntimeIOException(e);
275 }
276 }
277
278 public void setSoLinger(int linger) {
279 try {
280 if (linger < 0) {
281 ch.socket().setSoLinger(false, 0);
282 } else {
283 ch.socket().setSoLinger(true, linger);
284 }
285 } catch (SocketException e) {
286 throw new RuntimeIOException(e);
287 }
288 }
289
290 public boolean isTcpNoDelay() {
291 try {
292 return ch.socket().getTcpNoDelay();
293 } catch (SocketException e) {
294 throw new RuntimeIOException(e);
295 }
296 }
297
298 public void setTcpNoDelay(boolean on) {
299 try {
300 ch.socket().setTcpNoDelay(on);
301 } catch (SocketException e) {
302 throw new RuntimeIOException(e);
303 }
304 }
305
306 public int getTrafficClass() {
307 if (SocketSessionConfigImpl.isGetTrafficClassAvailable()) {
308 try {
309 return ch.socket().getTrafficClass();
310 } catch (SocketException e) {
311
312 if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
313 throw new RuntimeIOException(e);
314 }
315 }
316 }
317
318 return 0;
319 }
320
321 public void setTrafficClass(int tc) {
322 if (SocketSessionConfigImpl.isSetTrafficClassAvailable()) {
323 try {
324 ch.socket().setTrafficClass(tc);
325 } catch (SocketException e) {
326 throw new RuntimeIOException(e);
327 }
328 }
329 }
330
331 public int getSendBufferSize() {
332 try {
333 return ch.socket().getSendBufferSize();
334 } catch (SocketException e) {
335 throw new RuntimeIOException(e);
336 }
337 }
338
339 public void setSendBufferSize(int size) {
340 if (SocketSessionConfigImpl.isSetSendBufferSizeAvailable()) {
341 try {
342 ch.socket().setSendBufferSize(size);
343 } catch (SocketException e) {
344 throw new RuntimeIOException(e);
345 }
346 }
347 }
348
349 public int getReceiveBufferSize() {
350 try {
351 return ch.socket().getReceiveBufferSize();
352 } catch (SocketException e) {
353 throw new RuntimeIOException(e);
354 }
355 }
356
357 public void setReceiveBufferSize(int size) {
358 if (SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable()) {
359 try {
360 ch.socket().setReceiveBufferSize(size);
361 } catch (SocketException e) {
362 throw new RuntimeIOException(e);
363 }
364 }
365 }
366 }
367 }