1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.nio;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.nio.channels.SelectionKey;
26 import java.nio.channels.Selector;
27 import java.nio.channels.ServerSocketChannel;
28 import java.nio.channels.SocketChannel;
29 import java.util.ArrayList;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Queue;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.mina.common.ExceptionMonitor;
42 import org.apache.mina.common.IoAcceptor;
43 import org.apache.mina.common.IoHandler;
44 import org.apache.mina.common.IoServiceConfig;
45 import org.apache.mina.common.support.BaseIoAcceptor;
46 import org.apache.mina.util.NamePreservingRunnable;
47 import org.apache.mina.util.NewThreadExecutor;
48
49
50
51
52
53
54
55 public class SocketAcceptor extends BaseIoAcceptor {
56 private static final AtomicInteger nextId = new AtomicInteger();
57
58 private final Executor executor;
59
60 private final Object lock = new Object();
61
62 private final int id = nextId.getAndIncrement();
63
64 private final String threadName = "SocketAcceptor-" + id;
65
66 private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
67
68 private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
69
70 private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
71
72 private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
73
74 private final SocketIoProcessor[] ioProcessors;
75
76 private final int processorCount;
77
78 private volatile Selector selector;
79
80 private Worker worker;
81
82 private int processorDistributor = 0;
83
84
85
86
87 public SocketAcceptor() {
88 this(1, new NewThreadExecutor());
89 }
90
91
92
93
94
95
96
97 public SocketAcceptor(int processorCount, Executor executor) {
98 if (processorCount < 1) {
99 throw new IllegalArgumentException(
100 "Must have at least one processor");
101 }
102
103
104 defaultConfig.getSessionConfig().setReuseAddress(true);
105
106 this.executor = executor;
107 this.processorCount = processorCount;
108 ioProcessors = new SocketIoProcessor[processorCount];
109
110 for (int i = 0; i < processorCount; i++) {
111 ioProcessors[i] = new SocketIoProcessor(
112 "SocketAcceptorIoProcessor-" + id + "." + i, executor);
113 }
114 }
115
116
117
118
119
120
121
122 public void bind(SocketAddress address, IoHandler handler,
123 IoServiceConfig config) throws IOException {
124 if (handler == null) {
125 throw new NullPointerException("handler");
126 }
127
128 if (address != null && !(address instanceof InetSocketAddress)) {
129 throw new IllegalArgumentException("Unexpected address type: "
130 + address.getClass());
131 }
132
133 if (config == null) {
134 config = getDefaultConfig();
135 }
136
137 RegistrationRequest request = new RegistrationRequest(address, handler,
138 config);
139
140 synchronized (lock) {
141 startupWorker();
142
143 registerQueue.add(request);
144
145 selector.wakeup();
146 }
147
148 try {
149 request.done.await();
150 } catch (InterruptedException e) {
151 ExceptionMonitor.getInstance().exceptionCaught(e);
152 }
153
154 if (request.exception != null) {
155 throw request.exception;
156 }
157 }
158
159 private void startupWorker() throws IOException {
160 synchronized (lock) {
161 if (worker == null) {
162 selector = Selector.open();
163 worker = new Worker();
164
165 executor.execute(new NamePreservingRunnable(worker, threadName));
166 }
167 }
168 }
169
170 public void unbind(SocketAddress address) {
171 if (address == null) {
172 throw new NullPointerException("address");
173 }
174
175 CancellationRequest request = new CancellationRequest(address);
176
177 synchronized (lock) {
178 try {
179 startupWorker();
180 } catch (IOException e) {
181
182
183
184
185 throw new IllegalArgumentException("Address not bound: " + address);
186 }
187
188 cancelQueue.add(request);
189
190 selector.wakeup();
191 }
192
193 try {
194 request.done.await();
195 } catch (InterruptedException e) {
196 ExceptionMonitor.getInstance().exceptionCaught(e);
197 }
198
199 if (request.exception != null) {
200 request.exception.fillInStackTrace();
201
202 throw request.exception;
203 }
204 }
205
206 public void unbindAll() {
207 List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels
208 .keySet());
209
210 for (SocketAddress address : addresses) {
211 unbind(address);
212 }
213 }
214
215 private class Worker implements Runnable {
216 public void run() {
217 Selector selector = SocketAcceptor.this.selector;
218 for (;;) {
219 try {
220 int nKeys = selector.select();
221
222 registerNew();
223
224 if (nKeys > 0) {
225 processSessions(selector.selectedKeys());
226 }
227
228 cancelKeys();
229
230 if (selector.keys().isEmpty()) {
231 synchronized (lock) {
232 if (selector.keys().isEmpty()
233 && registerQueue.isEmpty()
234 && cancelQueue.isEmpty()) {
235 worker = null;
236 try {
237 selector.close();
238 } catch (IOException e) {
239 ExceptionMonitor.getInstance()
240 .exceptionCaught(e);
241 } finally {
242 SocketAcceptor.this.selector = null;
243 }
244 break;
245 }
246 }
247 }
248 } catch (IOException e) {
249 ExceptionMonitor.getInstance().exceptionCaught(e);
250
251 try {
252 Thread.sleep(1000);
253 } catch (InterruptedException e1) {
254 ExceptionMonitor.getInstance().exceptionCaught(e1);
255 }
256 }
257 }
258 }
259
260 private void processSessions(Set<SelectionKey> keys) throws IOException {
261 Iterator<SelectionKey> it = keys.iterator();
262 while (it.hasNext()) {
263 SelectionKey key = it.next();
264
265 it.remove();
266
267 if (!key.isAcceptable()) {
268 continue;
269 }
270
271 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
272
273 SocketChannel ch = ssc.accept();
274
275 if (ch == null) {
276 continue;
277 }
278
279 boolean success = false;
280 try {
281 RegistrationRequest req = (RegistrationRequest) key
282 .attachment();
283 SocketSessionImpl session = new SocketSessionImpl(
284 SocketAcceptor.this, nextProcessor(),
285 getListeners(), req.config, ch, req.handler,
286 req.address);
287 getFilterChainBuilder().buildFilterChain(
288 session.getFilterChain());
289 req.config.getFilterChainBuilder().buildFilterChain(
290 session.getFilterChain());
291 req.config.getThreadModel().buildFilterChain(
292 session.getFilterChain());
293 session.getIoProcessor().addNew(session);
294 success = true;
295 } catch (Throwable t) {
296 ExceptionMonitor.getInstance().exceptionCaught(t);
297 } finally {
298 if (!success) {
299 ch.close();
300 }
301 }
302 }
303 }
304 }
305
306 private SocketIoProcessor nextProcessor() {
307 if (this.processorDistributor == Integer.MAX_VALUE) {
308 this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
309 }
310
311 return ioProcessors[processorDistributor++ % processorCount];
312 }
313
314 public SocketAcceptorConfig getDefaultConfig() {
315 return defaultConfig;
316 }
317
318
319
320
321
322
323
324 public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
325 if (defaultConfig == null) {
326 throw new NullPointerException("defaultConfig");
327 }
328 this.defaultConfig = defaultConfig;
329 }
330
331 private void registerNew() {
332 if (registerQueue.isEmpty()) {
333 return;
334 }
335
336 Selector selector = this.selector;
337 for (;;) {
338 RegistrationRequest req = registerQueue.poll();
339
340 if (req == null) {
341 break;
342 }
343
344 ServerSocketChannel ssc = null;
345
346 try {
347 ssc = ServerSocketChannel.open();
348 ssc.configureBlocking(false);
349
350
351 SocketAcceptorConfig cfg;
352 if (req.config instanceof SocketAcceptorConfig) {
353 cfg = (SocketAcceptorConfig) req.config;
354 } else {
355 cfg = getDefaultConfig();
356 }
357
358 ssc.socket().setReuseAddress(cfg.isReuseAddress());
359 ssc.socket().setReceiveBufferSize(
360 cfg.getSessionConfig().getReceiveBufferSize());
361
362
363 ssc.socket().bind(req.address, cfg.getBacklog());
364 if (req.address == null || req.address.getPort() == 0) {
365 req.address = (InetSocketAddress) ssc.socket()
366 .getLocalSocketAddress();
367 }
368 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
369
370 channels.put(req.address, ssc);
371
372 getListeners().fireServiceActivated(this, req.address,
373 req.handler, req.config);
374 } catch (IOException e) {
375 req.exception = e;
376 } finally {
377 req.done.countDown();
378
379 if (ssc != null && req.exception != null) {
380 try {
381 ssc.close();
382 } catch (IOException e) {
383 ExceptionMonitor.getInstance().exceptionCaught(e);
384 }
385 }
386 }
387 }
388 }
389
390 private void cancelKeys() {
391 if (cancelQueue.isEmpty()) {
392 return;
393 }
394
395 Selector selector = this.selector;
396 for (;;) {
397 CancellationRequest request = cancelQueue.poll();
398
399 if (request == null) {
400 break;
401 }
402
403 ServerSocketChannel ssc = channels.remove(request.address);
404
405
406 try {
407 if (ssc == null) {
408 request.exception = new IllegalArgumentException(
409 "Address not bound: " + request.address);
410 } else {
411 SelectionKey key = ssc.keyFor(selector);
412 request.registrationRequest = (RegistrationRequest) key
413 .attachment();
414 key.cancel();
415
416 selector.wakeup();
417
418 ssc.close();
419 }
420 } catch (IOException e) {
421 ExceptionMonitor.getInstance().exceptionCaught(e);
422 } finally {
423 request.done.countDown();
424
425 if (request.exception == null) {
426 getListeners().fireServiceDeactivated(this,
427 request.address,
428 request.registrationRequest.handler,
429 request.registrationRequest.config);
430 }
431 }
432 }
433 }
434
435 private static class RegistrationRequest {
436 private InetSocketAddress address;
437
438 private final IoHandler handler;
439
440 private final IoServiceConfig config;
441
442 private final CountDownLatch done = new CountDownLatch(1);
443
444 private volatile IOException exception;
445
446 private RegistrationRequest(SocketAddress address, IoHandler handler,
447 IoServiceConfig config) {
448 this.address = (InetSocketAddress) address;
449 this.handler = handler;
450 this.config = config;
451 }
452 }
453
454 private static class CancellationRequest {
455 private final SocketAddress address;
456
457 private final CountDownLatch done = new CountDownLatch(1);
458
459 private RegistrationRequest registrationRequest;
460
461 private volatile RuntimeException exception;
462
463 private CancellationRequest(SocketAddress address) {
464 this.address = address;
465 }
466 }
467 }