查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.pool;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.GenericFutureListener;
20  import io.netty.util.concurrent.GlobalEventExecutor;
21  import io.netty.util.concurrent.Promise;
22  import io.netty.util.internal.PlatformDependent;
23  import io.netty.util.internal.ReadOnlyIterator;
24  
25  import java.io.Closeable;
26  import java.util.Iterator;
27  import java.util.Map.Entry;
28  import java.util.concurrent.ConcurrentMap;
29  
30  import static io.netty.util.internal.ObjectUtil.checkNotNull;
31  
32  /**
33   * A skeletal {@link ChannelPoolMap} implementation. To find the right {@link ChannelPool}
34   * the {@link Object#hashCode()} and {@link Object#equals(Object)} is used.
35   */
36  public abstract class AbstractChannelPoolMap<K, P extends ChannelPool>
37          implements ChannelPoolMap<K, P>, Iterable<Entry<K, P>>, Closeable {
38      private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();
39  
40      @Override
41      public final P get(K key) {
42          P pool = map.get(checkNotNull(key, "key"));
43          if (pool == null) {
44              pool = newPool(key);
45              P old = map.putIfAbsent(key, pool);
46              if (old != null) {
47                  // We need to destroy the newly created pool as we not use it.
48                  poolCloseAsyncIfSupported(pool);
49                  pool = old;
50              }
51          }
52          return pool;
53      }
54      /**
55       * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns {@code true} if removed,
56       * {@code false} otherwise.
57       *
58       * If the removed pool extends {@link SimpleChannelPool} it will be closed asynchronously to avoid blocking in
59       * this method.
60       *
61       * Please note that {@code null} keys are not allowed.
62       */
63      public final boolean remove(K key) {
64          P pool =  map.remove(checkNotNull(key, "key"));
65          if (pool != null) {
66              poolCloseAsyncIfSupported(pool);
67              return true;
68          }
69          return false;
70      }
71  
72      /**
73       * Remove the {@link ChannelPool} from this {@link AbstractChannelPoolMap}. Returns a future that comletes with a
74       * {@code true} result if the pool has been removed by this call, otherwise the result is {@code false}.
75       *
76       * If the removed pool extends {@link SimpleChannelPool} it will be closed asynchronously to avoid blocking in
77       * this method. The returned future will be completed once this asynchronous pool close operation completes.
78       */
79      private Future<Boolean> removeAsyncIfSupported(K key) {
80          P pool =  map.remove(checkNotNull(key, "key"));
81          if (pool != null) {
82              final Promise<Boolean> removePromise = GlobalEventExecutor.INSTANCE.newPromise();
83              poolCloseAsyncIfSupported(pool).addListener(new GenericFutureListener<Future<? super Void>>() {
84                  @Override
85                  public void operationComplete(Future<? super Void> future) throws Exception {
86                      if (future.isSuccess()) {
87                          removePromise.setSuccess(Boolean.TRUE);
88                      } else {
89                          removePromise.setFailure(future.cause());
90                      }
91                  }
92              });
93              return removePromise;
94          }
95          return GlobalEventExecutor.INSTANCE.newSucceededFuture(Boolean.FALSE);
96      }
97  
98      /**
99       * If the pool implementation supports asynchronous close, then use it to avoid a blocking close call in case
100      * the ChannelPoolMap operations are called from an EventLoop.
101      *
102      * @param pool the ChannelPool to be closed
103      */
104     private static Future<Void> poolCloseAsyncIfSupported(ChannelPool pool) {
105         if (pool instanceof SimpleChannelPool) {
106             return ((SimpleChannelPool) pool).closeAsync();
107         } else {
108             try {
109                 pool.close();
110                 return GlobalEventExecutor.INSTANCE.newSucceededFuture(null);
111             } catch (Exception e) {
112                 return GlobalEventExecutor.INSTANCE.newFailedFuture(e);
113             }
114         }
115     }
116 
117     @Override
118     public final Iterator<Entry<K, P>> iterator() {
119         return new ReadOnlyIterator<Entry<K, P>>(map.entrySet().iterator());
120     }
121 
122     /**
123      * Returns the number of {@link ChannelPool}s currently in this {@link AbstractChannelPoolMap}.
124      */
125     public final int size() {
126         return map.size();
127     }
128 
129     /**
130      * Returns {@code true} if the {@link AbstractChannelPoolMap} is empty, otherwise {@code false}.
131      */
132     public final boolean isEmpty() {
133         return map.isEmpty();
134     }
135 
136     @Override
137     public final boolean contains(K key) {
138         return map.containsKey(checkNotNull(key, "key"));
139     }
140 
141     /**
142      * Called once a new {@link ChannelPool} needs to be created as non exists yet for the {@code key}.
143      */
144     protected abstract P newPool(K key);
145 
146     @Override
147     public final void close() {
148         for (K key: map.keySet()) {
149             // Wait for remove to finish to ensure that resources are released before returning from close
150             removeAsyncIfSupported(key).syncUninterruptibly();
151         }
152     }
153 }