查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.nio.channels.SelectionKey;
26  import java.nio.channels.Selector;
27  import java.nio.channels.ServerSocketChannel;
28  import java.nio.channels.SocketChannel;
29  import java.util.ArrayList;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Queue;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  import org.apache.mina.common.ExceptionMonitor;
42  import org.apache.mina.common.IoAcceptor;
43  import org.apache.mina.common.IoHandler;
44  import org.apache.mina.common.IoServiceConfig;
45  import org.apache.mina.common.support.BaseIoAcceptor;
46  import org.apache.mina.util.NamePreservingRunnable;
47  import org.apache.mina.util.NewThreadExecutor;
48  
49  /**
50   * {@link IoAcceptor} for socket transport (TCP/IP).
51   *
52   * @author The Apache Directory Project (mina-dev@directory.apache.org)
53   * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
54   */
55  public class SocketAcceptor extends BaseIoAcceptor {
56      private static final AtomicInteger nextId = new AtomicInteger();
57  
58      private final Executor executor;
59  
60      private final Object lock = new Object();
61  
62      private final int id = nextId.getAndIncrement();
63  
64      private final String threadName = "SocketAcceptor-" + id;
65  
66      private SocketAcceptorConfig defaultConfig = new SocketAcceptorConfig();
67  
68      private final Map<SocketAddress, ServerSocketChannel> channels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
69  
70      private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
71  
72      private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
73  
74      private final SocketIoProcessor[] ioProcessors;
75  
76      private final int processorCount;
77  
78      private volatile Selector selector;
79  
80      private Worker worker;
81  
82      private int processorDistributor = 0;
83  
84      /**
85       * Create an acceptor with a single processing thread using a NewThreadExecutor
86       */
87      public SocketAcceptor() {
88          this(1, new NewThreadExecutor());
89      }
90  
91      /**
92       * Create an acceptor with the desired number of processing threads
93       *
94       * @param processorCount Number of processing threads
95       * @param executor       Executor to use for launching threads
96       */
97      public SocketAcceptor(int processorCount, Executor executor) {
98          if (processorCount < 1) {
99              throw new IllegalArgumentException(
100                     "Must have at least one processor");
101         }
102 
103         // The default reuseAddress of an accepted socket should be 'true'.
104         defaultConfig.getSessionConfig().setReuseAddress(true);
105 
106         this.executor = executor;
107         this.processorCount = processorCount;
108         ioProcessors = new SocketIoProcessor[processorCount];
109 
110         for (int i = 0; i < processorCount; i++) {
111             ioProcessors[i] = new SocketIoProcessor(
112                     "SocketAcceptorIoProcessor-" + id + "." + i, executor);
113         }
114     }
115 
116     /**
117      * Binds to the specified <code>address</code> and handles incoming connections with the specified
118      * <code>handler</code>.  Backlog value is configured to the value of <code>backlog</code> property.
119      *
120      * @throws IOException if failed to bind
121      */
122     public void bind(SocketAddress address, IoHandler handler,
123             IoServiceConfig config) throws IOException {
124         if (handler == null) {
125             throw new NullPointerException("handler");
126         }
127 
128         if (address != null && !(address instanceof InetSocketAddress)) {
129             throw new IllegalArgumentException("Unexpected address type: "
130                     + address.getClass());
131         }
132 
133         if (config == null) {
134             config = getDefaultConfig();
135         }
136 
137         RegistrationRequest request = new RegistrationRequest(address, handler,
138                 config);
139 
140         synchronized (lock) {
141             startupWorker();
142     
143             registerQueue.add(request);
144     
145             selector.wakeup();
146         }
147 
148         try {
149             request.done.await();
150         } catch (InterruptedException e) {
151             ExceptionMonitor.getInstance().exceptionCaught(e);
152         }
153 
154         if (request.exception != null) {
155             throw request.exception;
156         }
157     }
158 
159     private void startupWorker() throws IOException {
160         synchronized (lock) {
161             if (worker == null) {
162                 selector = Selector.open();
163                 worker = new Worker();
164 
165                 executor.execute(new NamePreservingRunnable(worker, threadName));
166             }
167         }
168     }
169 
170     public void unbind(SocketAddress address) {
171         if (address == null) {
172             throw new NullPointerException("address");
173         }
174 
175         CancellationRequest request = new CancellationRequest(address);
176 
177         synchronized (lock) {
178             try {
179                 startupWorker();
180             } catch (IOException e) {
181                 // IOException is thrown only when Worker thread is not
182                 // running and failed to open a selector.  We simply throw
183                 // IllegalArgumentException here because we can simply
184                 // conclude that nothing is bound to the selector.
185                 throw new IllegalArgumentException("Address not bound: " + address);
186             }
187     
188             cancelQueue.add(request);
189     
190             selector.wakeup();
191         }
192 
193         try {
194             request.done.await();
195         } catch (InterruptedException e) {
196             ExceptionMonitor.getInstance().exceptionCaught(e);
197         }
198 
199         if (request.exception != null) {
200             request.exception.fillInStackTrace();
201 
202             throw request.exception;
203         }
204     }
205 
206     public void unbindAll() {
207         List<SocketAddress> addresses = new ArrayList<SocketAddress>(channels
208                 .keySet());
209 
210         for (SocketAddress address : addresses) {
211             unbind(address);
212         }
213     }
214 
215     private class Worker implements Runnable {
216         public void run() {
217             Selector selector = SocketAcceptor.this.selector;
218             for (;;) {
219                 try {
220                     int nKeys = selector.select();
221 
222                     registerNew();
223 
224                     if (nKeys > 0) {
225                         processSessions(selector.selectedKeys());
226                     }
227 
228                     cancelKeys();
229 
230                     if (selector.keys().isEmpty()) {
231                         synchronized (lock) {
232                             if (selector.keys().isEmpty()
233                                     && registerQueue.isEmpty()
234                                     && cancelQueue.isEmpty()) {
235                                 worker = null;
236                                 try {
237                                     selector.close();
238                                 } catch (IOException e) {
239                                     ExceptionMonitor.getInstance()
240                                             .exceptionCaught(e);
241                                 } finally {
242                                     SocketAcceptor.this.selector = null;
243                                 }
244                                 break;
245                             }
246                         }
247                     }
248                 } catch (IOException e) {
249                     ExceptionMonitor.getInstance().exceptionCaught(e);
250 
251                     try {
252                         Thread.sleep(1000);
253                     } catch (InterruptedException e1) {
254                         ExceptionMonitor.getInstance().exceptionCaught(e1);
255                     }
256                 }
257             }
258         }
259 
260         private void processSessions(Set<SelectionKey> keys) throws IOException {
261             Iterator<SelectionKey> it = keys.iterator();
262             while (it.hasNext()) {
263                 SelectionKey key = it.next();
264 
265                 it.remove();
266 
267                 if (!key.isAcceptable()) {
268                     continue;
269                 }
270 
271                 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
272 
273                 SocketChannel ch = ssc.accept();
274 
275                 if (ch == null) {
276                     continue;
277                 }
278 
279                 boolean success = false;
280                 try {
281                     RegistrationRequest req = (RegistrationRequest) key
282                             .attachment();
283                     SocketSessionImpl session = new SocketSessionImpl(
284                             SocketAcceptor.this, nextProcessor(),
285                             getListeners(), req.config, ch, req.handler,
286                             req.address);
287                     getFilterChainBuilder().buildFilterChain(
288                             session.getFilterChain());
289                     req.config.getFilterChainBuilder().buildFilterChain(
290                             session.getFilterChain());
291                     req.config.getThreadModel().buildFilterChain(
292                             session.getFilterChain());
293                     session.getIoProcessor().addNew(session);
294                     success = true;
295                 } catch (Throwable t) {
296                     ExceptionMonitor.getInstance().exceptionCaught(t);
297                 } finally {
298                     if (!success) {
299                         ch.close();
300                     }
301                 }
302             }
303         }
304     }
305 
306     private SocketIoProcessor nextProcessor() {
307         if (this.processorDistributor == Integer.MAX_VALUE) {
308             this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
309         }
310 
311         return ioProcessors[processorDistributor++ % processorCount];
312     }
313 
314     public SocketAcceptorConfig getDefaultConfig() {
315         return defaultConfig;
316     }
317 
318     /**
319      * Sets the config this acceptor will use by default.
320      *
321      * @param defaultConfig the default config.
322      * @throws NullPointerException if the specified value is <code>null</code>.
323      */
324     public void setDefaultConfig(SocketAcceptorConfig defaultConfig) {
325         if (defaultConfig == null) {
326             throw new NullPointerException("defaultConfig");
327         }
328         this.defaultConfig = defaultConfig;
329     }
330 
331     private void registerNew() {
332         if (registerQueue.isEmpty()) {
333             return;
334         }
335 
336         Selector selector = this.selector;
337         for (;;) {
338             RegistrationRequest req = registerQueue.poll();
339 
340             if (req == null) {
341                 break;
342             }
343 
344             ServerSocketChannel ssc = null;
345 
346             try {
347                 ssc = ServerSocketChannel.open();
348                 ssc.configureBlocking(false);
349 
350                 // Configure the server socket,
351                 SocketAcceptorConfig cfg;
352                 if (req.config instanceof SocketAcceptorConfig) {
353                     cfg = (SocketAcceptorConfig) req.config;
354                 } else {
355                     cfg = getDefaultConfig();
356                 }
357 
358                 ssc.socket().setReuseAddress(cfg.isReuseAddress());
359                 ssc.socket().setReceiveBufferSize(
360                         cfg.getSessionConfig().getReceiveBufferSize());
361 
362                 // and bind.
363                 ssc.socket().bind(req.address, cfg.getBacklog());
364                 if (req.address == null || req.address.getPort() == 0) {
365                     req.address = (InetSocketAddress) ssc.socket()
366                             .getLocalSocketAddress();
367                 }
368                 ssc.register(selector, SelectionKey.OP_ACCEPT, req);
369 
370                 channels.put(req.address, ssc);
371 
372                 getListeners().fireServiceActivated(this, req.address,
373                         req.handler, req.config);
374             } catch (IOException e) {
375                 req.exception = e;
376             } finally {
377                 req.done.countDown();
378 
379                 if (ssc != null && req.exception != null) {
380                     try {
381                         ssc.close();
382                     } catch (IOException e) {
383                         ExceptionMonitor.getInstance().exceptionCaught(e);
384                     }
385                 }
386             }
387         }
388     }
389 
390     private void cancelKeys() {
391         if (cancelQueue.isEmpty()) {
392             return;
393         }
394 
395         Selector selector = this.selector;
396         for (;;) {
397             CancellationRequest request = cancelQueue.poll();
398 
399             if (request == null) {
400                 break;
401             }
402 
403             ServerSocketChannel ssc = channels.remove(request.address);
404 
405             // close the channel
406             try {
407                 if (ssc == null) {
408                     request.exception = new IllegalArgumentException(
409                             "Address not bound: " + request.address);
410                 } else {
411                     SelectionKey key = ssc.keyFor(selector);
412                     request.registrationRequest = (RegistrationRequest) key
413                             .attachment();
414                     key.cancel();
415 
416                     selector.wakeup(); // wake up again to trigger thread death
417 
418                     ssc.close();
419                 }
420             } catch (IOException e) {
421                 ExceptionMonitor.getInstance().exceptionCaught(e);
422             } finally {
423                 request.done.countDown();
424 
425                 if (request.exception == null) {
426                     getListeners().fireServiceDeactivated(this,
427                             request.address,
428                             request.registrationRequest.handler,
429                             request.registrationRequest.config);
430                 }
431             }
432         }
433     }
434 
435     private static class RegistrationRequest {
436         private InetSocketAddress address;
437 
438         private final IoHandler handler;
439 
440         private final IoServiceConfig config;
441 
442         private final CountDownLatch done = new CountDownLatch(1);
443 
444         private volatile IOException exception;
445 
446         private RegistrationRequest(SocketAddress address, IoHandler handler,
447                 IoServiceConfig config) {
448             this.address = (InetSocketAddress) address;
449             this.handler = handler;
450             this.config = config;
451         }
452     }
453 
454     private static class CancellationRequest {
455         private final SocketAddress address;
456 
457         private final CountDownLatch done = new CountDownLatch(1);
458 
459         private RegistrationRequest registrationRequest;
460 
461         private volatile RuntimeException exception;
462 
463         private CancellationRequest(SocketAddress address) {
464             this.address = address;
465         }
466     }
467 }