1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.ConnectException;
24 import java.net.InetSocketAddress;
25 import java.net.SocketAddress;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.nio.channels.SocketChannel;
29 import java.util.Queue;
30 import java.util.Set;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32 import java.util.concurrent.Executor;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.apache.mina.common.ConnectFuture;
36 import org.apache.mina.common.ExceptionMonitor;
37 import org.apache.mina.common.IoConnector;
38 import org.apache.mina.common.IoConnectorConfig;
39 import org.apache.mina.common.IoHandler;
40 import org.apache.mina.common.IoServiceConfig;
41 import org.apache.mina.common.support.AbstractIoFilterChain;
42 import org.apache.mina.common.support.BaseIoConnector;
43 import org.apache.mina.common.support.DefaultConnectFuture;
44 import org.apache.mina.util.NamePreservingRunnable;
45 import org.apache.mina.util.NewThreadExecutor;
46
47
48
49
50
51
52
53 public class SocketConnector extends BaseIoConnector {
54 private static final AtomicInteger nextId = new AtomicInteger();
55
56 private final Object lock = new Object();
57
58 private final int id = nextId.getAndIncrement();
59
60 private final String threadName = "SocketConnector-" + id;
61
62 private SocketConnectorConfig defaultConfig = new SocketConnectorConfig();
63
64 private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
65
66 private final SocketIoProcessor[] ioProcessors;
67
68 private final int processorCount;
69
70 private final Executor executor;
71
72 private volatile Selector selector;
73
74 private Worker worker;
75
76 private int processorDistributor = 0;
77
78 private int workerTimeout = 60;
79
80
81
82
83 public SocketConnector() {
84 this(1, new NewThreadExecutor());
85 }
86
87
88
89
90
91
92
93 public SocketConnector(int processorCount, Executor executor) {
94 if (processorCount < 1) {
95 throw new IllegalArgumentException(
96 "Must have at least one processor");
97 }
98
99 this.executor = executor;
100 this.processorCount = processorCount;
101 ioProcessors = new SocketIoProcessor[processorCount];
102
103 for (int i = 0; i < processorCount; i++) {
104 ioProcessors[i] = new SocketIoProcessor(
105 "SocketConnectorIoProcessor-" + id + "." + i, executor);
106 }
107 }
108
109
110
111
112
113
114
115
116 public int getWorkerTimeout() {
117 return workerTimeout;
118 }
119
120
121
122
123
124
125
126
127
128 public void setWorkerTimeout(int workerTimeout) {
129 if (workerTimeout < 0) {
130 throw new IllegalArgumentException("Must be >= 0");
131 }
132 this.workerTimeout = workerTimeout;
133 }
134
135 public ConnectFuture connect(SocketAddress address, IoHandler handler,
136 IoServiceConfig config) {
137 return connect(address, null, handler, config);
138 }
139
140 public ConnectFuture connect(SocketAddress address,
141 SocketAddress localAddress, IoHandler handler,
142 IoServiceConfig config) {
143 if (address == null) {
144 throw new NullPointerException("address");
145 }
146 if (handler == null) {
147 throw new NullPointerException("handler");
148 }
149
150 if (!(address instanceof InetSocketAddress)) {
151 throw new IllegalArgumentException("Unexpected address type: "
152 + address.getClass());
153 }
154
155 if (localAddress != null
156 && !(localAddress instanceof InetSocketAddress)) {
157 throw new IllegalArgumentException(
158 "Unexpected local address type: " + localAddress.getClass());
159 }
160
161 if (config == null) {
162 config = getDefaultConfig();
163 }
164
165 SocketChannel ch = null;
166 boolean success = false;
167 try {
168 ch = SocketChannel.open();
169 ch.socket().setReuseAddress(true);
170
171
172
173 if (config instanceof SocketConnectorConfig) {
174 int receiveBufferSize =
175 ((SocketSessionConfig) config.getSessionConfig()).getReceiveBufferSize();
176 if (receiveBufferSize > 65535) {
177 ch.socket().setReceiveBufferSize(receiveBufferSize);
178 }
179 }
180
181 if (localAddress != null) {
182 ch.socket().bind(localAddress);
183 }
184
185 ch.configureBlocking(false);
186
187 if (ch.connect(address)) {
188 DefaultConnectFuture future = new DefaultConnectFuture();
189 newSession(ch, handler, config, future);
190 success = true;
191 return future;
192 }
193
194 success = true;
195 } catch (IOException e) {
196 return DefaultConnectFuture.newFailedFuture(e);
197 } finally {
198 if (!success && ch != null) {
199 try {
200 ch.close();
201 } catch (IOException e) {
202 ExceptionMonitor.getInstance().exceptionCaught(e);
203 }
204 }
205 }
206
207 ConnectionRequest request = new ConnectionRequest(ch, handler, config);
208 synchronized (lock) {
209 try {
210 startupWorker();
211 } catch (IOException e) {
212 try {
213 ch.close();
214 } catch (IOException e2) {
215 ExceptionMonitor.getInstance().exceptionCaught(e2);
216 }
217
218 return DefaultConnectFuture.newFailedFuture(e);
219 }
220
221 connectQueue.add(request);
222 selector.wakeup();
223 }
224
225 return request;
226 }
227
228 public SocketConnectorConfig getDefaultConfig() {
229 return defaultConfig;
230 }
231
232
233
234
235
236
237
238 public void setDefaultConfig(SocketConnectorConfig defaultConfig) {
239 if (defaultConfig == null) {
240 throw new NullPointerException("defaultConfig");
241 }
242 this.defaultConfig = defaultConfig;
243 }
244
245 private void startupWorker() throws IOException {
246 synchronized (lock) {
247 if (worker == null) {
248 selector = Selector.open();
249 worker = new Worker();
250 executor.execute(new NamePreservingRunnable(worker, threadName));
251 }
252 }
253 }
254
255 private void registerNew() {
256 if (connectQueue.isEmpty()) {
257 return;
258 }
259
260 Selector selector = this.selector;
261 for (;;) {
262 ConnectionRequest req = connectQueue.poll();
263
264 if (req == null) {
265 break;
266 }
267
268 SocketChannel ch = req.channel;
269 try {
270 ch.register(selector, SelectionKey.OP_CONNECT, req);
271 } catch (IOException e) {
272 req.setException(e);
273 try {
274 ch.close();
275 } catch (IOException e2) {
276 ExceptionMonitor.getInstance().exceptionCaught(e2);
277 }
278 }
279 }
280 }
281
282 private void processSessions(Set<SelectionKey> keys) {
283 for (SelectionKey key : keys) {
284 if (!key.isConnectable()) {
285 continue;
286 }
287
288 SocketChannel ch = (SocketChannel) key.channel();
289 ConnectionRequest entry = (ConnectionRequest) key.attachment();
290
291 boolean success = false;
292 try {
293 if (ch.finishConnect()) {
294 key.cancel();
295 newSession(ch, entry.handler, entry.config, entry);
296 }
297 success = true;
298 } catch (Throwable e) {
299 entry.setException(e);
300 } finally {
301 if (!success) {
302 key.cancel();
303 try {
304 ch.close();
305 } catch (IOException e) {
306 ExceptionMonitor.getInstance().exceptionCaught(e);
307 }
308 }
309 }
310 }
311
312 keys.clear();
313 }
314
315 private void processTimedOutSessions(Set<SelectionKey> keys) {
316 long currentTime = System.currentTimeMillis();
317
318 for (SelectionKey key : keys) {
319 if (!key.isValid()) {
320 continue;
321 }
322
323 ConnectionRequest entry = (ConnectionRequest) key.attachment();
324
325 if (currentTime >= entry.deadline) {
326 entry.setException(new ConnectException());
327 try {
328 key.channel().close();
329 } catch (IOException e) {
330 ExceptionMonitor.getInstance().exceptionCaught(e);
331 } finally {
332 key.cancel();
333 }
334 }
335 }
336 }
337
338 private void newSession(SocketChannel ch, IoHandler handler,
339 IoServiceConfig config, ConnectFuture connectFuture)
340 throws IOException {
341 SocketSessionImpl session = new SocketSessionImpl(this,
342 nextProcessor(), getListeners(), config, ch, handler, ch
343 .socket().getRemoteSocketAddress());
344 try {
345 getFilterChainBuilder().buildFilterChain(session.getFilterChain());
346 config.getFilterChainBuilder().buildFilterChain(
347 session.getFilterChain());
348 config.getThreadModel().buildFilterChain(session.getFilterChain());
349 } catch (Throwable e) {
350 throw (IOException) new IOException("Failed to create a session.")
351 .initCause(e);
352 }
353
354
355
356 session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE,
357 connectFuture);
358
359
360 session.getIoProcessor().addNew(session);
361 }
362
363 private SocketIoProcessor nextProcessor() {
364 if (this.processorDistributor == Integer.MAX_VALUE) {
365 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
366 }
367
368 return ioProcessors[processorDistributor++ % processorCount];
369 }
370
371 private class Worker implements Runnable {
372 private long lastActive = System.currentTimeMillis();
373
374 public void run() {
375 Selector selector = SocketConnector.this.selector;
376 for (;;) {
377 try {
378 int nKeys = selector.select(1000);
379
380 registerNew();
381
382 if (nKeys > 0) {
383 processSessions(selector.selectedKeys());
384 }
385
386 processTimedOutSessions(selector.keys());
387
388 if (selector.keys().isEmpty()) {
389 if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) {
390 synchronized (lock) {
391 if (selector.keys().isEmpty()
392 && connectQueue.isEmpty()) {
393 worker = null;
394 try {
395 selector.close();
396 } catch (IOException e) {
397 ExceptionMonitor.getInstance()
398 .exceptionCaught(e);
399 } finally {
400 SocketConnector.this.selector = null;
401 }
402 break;
403 }
404 }
405 }
406 } else {
407 lastActive = System.currentTimeMillis();
408 }
409 } catch (IOException e) {
410 ExceptionMonitor.getInstance().exceptionCaught(e);
411
412 try {
413 Thread.sleep(1000);
414 } catch (InterruptedException e1) {
415 ExceptionMonitor.getInstance().exceptionCaught(e1);
416 }
417 }
418 }
419 }
420 }
421
422 private class ConnectionRequest extends DefaultConnectFuture {
423 private final SocketChannel channel;
424
425 private final long deadline;
426
427 private final IoHandler handler;
428
429 private final IoServiceConfig config;
430
431 private ConnectionRequest(SocketChannel channel, IoHandler handler,
432 IoServiceConfig config) {
433 this.channel = channel;
434 long timeout;
435 if (config instanceof IoConnectorConfig) {
436 timeout = ((IoConnectorConfig) config)
437 .getConnectTimeoutMillis();
438 } else {
439 timeout = ((IoConnectorConfig) getDefaultConfig())
440 .getConnectTimeoutMillis();
441 }
442 this.deadline = System.currentTimeMillis() + timeout;
443 this.handler = handler;
444 this.config = config;
445 }
446 }
447 }