查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.logging.InternalLogger;
19  import org.jboss.netty.logging.InternalLoggerFactory;
20  import org.jboss.netty.util.ExternalResourceReleasable;
21  import org.jboss.netty.util.internal.ExecutorUtil;
22  
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  public abstract class AbstractNioBossPool<E extends Boss>
29          implements BossPool<E>, ExternalResourceReleasable {
30  
31      /**
32       * The boss pool raises an exception unless all boss threads start and run within this timeout (in seconds.)
33       */
34      private static final int INITIALIZATION_TIMEOUT = 10;
35  
36      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioBossPool.class);
37  
38      private final Boss[] bosses;
39      private final AtomicInteger bossIndex = new AtomicInteger();
40      private final Executor bossExecutor;
41      private final AtomicBoolean initialized = new AtomicBoolean(false);
42  
43      /**
44       * Create a new instance
45       *
46       * @param bossExecutor the {@link Executor} to use for the {@link Boss}'s
47       * @param bossCount the count of {@link Boss}'s to create
48       */
49      AbstractNioBossPool(Executor bossExecutor, int bossCount) {
50          this(bossExecutor, bossCount, true);
51      }
52  
53      AbstractNioBossPool(Executor bossExecutor, int bossCount, boolean autoInit) {
54          if (bossExecutor == null) {
55              throw new NullPointerException("bossExecutor");
56          }
57          if (bossCount <= 0) {
58              throw new IllegalArgumentException(
59                      "bossCount (" + bossCount + ") " +
60                              "must be a positive integer.");
61          }
62          bosses = new Boss[bossCount];
63          this.bossExecutor = bossExecutor;
64          if (autoInit) {
65              init();
66          }
67      }
68  
69      protected void init() {
70          if (!initialized.compareAndSet(false, true)) {
71              throw new IllegalStateException("initialized already");
72          }
73  
74          for (int i = 0; i < bosses.length; i++) {
75              bosses[i] = newBoss(bossExecutor);
76          }
77  
78          waitForBossThreads();
79      }
80  
81      private void waitForBossThreads() {
82          long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
83          boolean warn = false;
84          for (Boss boss: bosses) {
85              if (!(boss instanceof AbstractNioSelector)) {
86                  continue;
87              }
88  
89              AbstractNioSelector selector = (AbstractNioSelector) boss;
90              long waitTime = deadline - System.nanoTime();
91              try {
92                  if (waitTime <= 0) {
93                      if (selector.thread == null) {
94                          warn = true;
95                          break;
96                      }
97                  } else if (!selector.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
98                      warn = true;
99                      break;
100                 }
101             } catch (InterruptedException ignore) {
102                 // Stop waiting for the boss threads and let someone else take care of the interruption.
103                 Thread.currentThread().interrupt();
104                 break;
105             }
106         }
107 
108         if (warn) {
109             logger.warn(
110                     "Failed to get all boss threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
111                     "Make sure to specify the executor which has more threads than the requested bossCount. " +
112                     "If unsure, use Executors.newCachedThreadPool().");
113         }
114     }
115 
116     /**
117      * Create a new {@link Boss} which uses the given {@link Executor} to service IO
118      *
119      *
120      * @param executor the {@link Executor} to use
121      * @return worker the new {@link Boss}
122      */
123     protected abstract E newBoss(Executor executor);
124 
125     @SuppressWarnings("unchecked")
126     public E nextBoss() {
127         return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
128     }
129 
130     public void rebuildSelectors() {
131         for (Boss boss: bosses) {
132             boss.rebuildSelector();
133         }
134     }
135 
136     public void releaseExternalResources() {
137         shutdown();
138         ExecutorUtil.shutdownNow(bossExecutor);
139     }
140 
141     public void shutdown() {
142         for (Boss boss: bosses) {
143             boss.shutdown();
144         }
145     }
146 }