查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util;
17  
18  import io.netty.util.concurrent.FastThreadLocal;
19  import io.netty.util.concurrent.FastThreadLocalThread;
20  import io.netty.util.internal.ObjectPool;
21  import io.netty.util.internal.PlatformDependent;
22  import io.netty.util.internal.SystemPropertyUtil;
23  import io.netty.util.internal.UnstableApi;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  import org.jctools.queues.MessagePassingQueue;
27  import org.jetbrains.annotations.VisibleForTesting;
28  
29  import java.util.ArrayDeque;
30  import java.util.Queue;
31  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32  
33  import static io.netty.util.internal.PlatformDependent.newMpscQueue;
34  import static java.lang.Math.max;
35  import static java.lang.Math.min;
36  
37  /**
38   * Light-weight object pool based on a thread-local stack.
39   *
40   * @param <T> the type of the pooled object
41   */
42  public abstract class Recycler<T> {
43      private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
44      private static final EnhancedHandle<?> NOOP_HANDLE = new EnhancedHandle<Object>() {
45          @Override
46          public void recycle(Object object) {
47              // NOOP
48          }
49  
50          @Override
51          public void unguardedRecycle(final Object object) {
52              // NOOP
53          }
54  
55          @Override
56          public String toString() {
57              return "NOOP_HANDLE";
58          }
59      };
60      private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
61      private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
62      private static final int RATIO;
63      private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
64      private static final boolean BLOCKING_POOL;
65      private static final boolean BATCH_FAST_TL_ONLY;
66  
67      static {
68          // In the future, we might have different maxCapacity for different object types.
69          // e.g. io.netty.recycler.maxCapacity.writeTask
70          //      io.netty.recycler.maxCapacity.outboundBuffer
71          int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
72                  SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
73          if (maxCapacityPerThread < 0) {
74              maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
75          }
76  
77          DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
78          DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD = SystemPropertyUtil.getInt("io.netty.recycler.chunkSize", 32);
79  
80          // By default, we allow one push to a Recycler for each 8th try on handles that were never recycled before.
81          // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
82          // bursts.
83          RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
84  
85          BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
86          BATCH_FAST_TL_ONLY = SystemPropertyUtil.getBoolean("io.netty.recycler.batchFastThreadLocalOnly", true);
87  
88          if (logger.isDebugEnabled()) {
89              if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
90                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
91                  logger.debug("-Dio.netty.recycler.ratio: disabled");
92                  logger.debug("-Dio.netty.recycler.chunkSize: disabled");
93                  logger.debug("-Dio.netty.recycler.blocking: disabled");
94                  logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: disabled");
95              } else {
96                  logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
97                  logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
98                  logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
99                  logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
100                 logger.debug("-Dio.netty.recycler.batchFastThreadLocalOnly: {}", BATCH_FAST_TL_ONLY);
101             }
102         }
103     }
104 
105     private final int maxCapacityPerThread;
106     private final int interval;
107     private final int chunkSize;
108     private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
109         @Override
110         protected LocalPool<T> initialValue() {
111             return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
112         }
113 
114         @Override
115         protected void onRemoval(LocalPool<T> value) throws Exception {
116             super.onRemoval(value);
117             MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
118             value.pooledHandles = null;
119             value.owner = null;
120             handles.clear();
121         }
122     };
123 
124     protected Recycler() {
125         this(DEFAULT_MAX_CAPACITY_PER_THREAD);
126     }
127 
128     protected Recycler(int maxCapacityPerThread) {
129         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
130     }
131 
132     /**
133      * @deprecated Use one of the following instead:
134      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
135      */
136     @Deprecated
137     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
138     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
139         this(maxCapacityPerThread, RATIO, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
140     }
141 
142     /**
143      * @deprecated Use one of the following instead:
144      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
145      */
146     @Deprecated
147     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
148     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
149                        int ratio, int maxDelayedQueuesPerThread) {
150         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
151     }
152 
153     /**
154      * @deprecated Use one of the following instead:
155      * {@link #Recycler()}, {@link #Recycler(int)}, {@link #Recycler(int, int, int)}.
156      */
157     @Deprecated
158     @SuppressWarnings("unused") // Parameters we can't remove due to compatibility.
159     protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
160                        int ratio, int maxDelayedQueuesPerThread, int delayedQueueRatio) {
161         this(maxCapacityPerThread, ratio, DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
162     }
163 
164     protected Recycler(int maxCapacityPerThread, int ratio, int chunkSize) {
165         interval = max(0, ratio);
166         if (maxCapacityPerThread <= 0) {
167             this.maxCapacityPerThread = 0;
168             this.chunkSize = 0;
169         } else {
170             this.maxCapacityPerThread = max(4, maxCapacityPerThread);
171             this.chunkSize = max(2, min(chunkSize, this.maxCapacityPerThread >> 1));
172         }
173     }
174 
175     @SuppressWarnings("unchecked")
176     public final T get() {
177         if (maxCapacityPerThread == 0) {
178             return newObject((Handle<T>) NOOP_HANDLE);
179         }
180         LocalPool<T> localPool = threadLocal.get();
181         DefaultHandle<T> handle = localPool.claim();
182         T obj;
183         if (handle == null) {
184             handle = localPool.newHandle();
185             if (handle != null) {
186                 obj = newObject(handle);
187                 handle.set(obj);
188             } else {
189                 obj = newObject((Handle<T>) NOOP_HANDLE);
190             }
191         } else {
192             obj = handle.get();
193         }
194 
195         return obj;
196     }
197 
198     /**
199      * @deprecated use {@link Handle#recycle(Object)}.
200      */
201     @Deprecated
202     public final boolean recycle(T o, Handle<T> handle) {
203         if (handle == NOOP_HANDLE) {
204             return false;
205         }
206 
207         handle.recycle(o);
208         return true;
209     }
210 
211     @VisibleForTesting
212     final int threadLocalSize() {
213         LocalPool<T> localPool = threadLocal.getIfExists();
214         return localPool == null ? 0 : localPool.pooledHandles.size() + localPool.batch.size();
215     }
216 
217     /**
218      * @param handle can NOT be null.
219      */
220     protected abstract T newObject(Handle<T> handle);
221 
222     @SuppressWarnings("ClassNameSameAsAncestorName") // Can't change this due to compatibility.
223     public interface Handle<T> extends ObjectPool.Handle<T>  { }
224 
225     @UnstableApi
226     public abstract static class EnhancedHandle<T> implements Handle<T> {
227 
228         public abstract void unguardedRecycle(Object object);
229 
230         private EnhancedHandle() {
231         }
232     }
233 
234     private static final class DefaultHandle<T> extends EnhancedHandle<T> {
235         private static final int STATE_CLAIMED = 0;
236         private static final int STATE_AVAILABLE = 1;
237         private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> STATE_UPDATER;
238         static {
239             AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(DefaultHandle.class, "state");
240             //noinspection unchecked
241             STATE_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
242         }
243 
244         private volatile int state; // State is initialised to STATE_CLAIMED (aka. 0) so they can be released.
245         private final LocalPool<T> localPool;
246         private T value;
247 
248         DefaultHandle(LocalPool<T> localPool) {
249             this.localPool = localPool;
250         }
251 
252         @Override
253         public void recycle(Object object) {
254             if (object != value) {
255                 throw new IllegalArgumentException("object does not belong to handle");
256             }
257             localPool.release(this, true);
258         }
259 
260         @Override
261         public void unguardedRecycle(Object object) {
262             if (object != value) {
263                 throw new IllegalArgumentException("object does not belong to handle");
264             }
265             localPool.release(this, false);
266         }
267 
268         T get() {
269             return value;
270         }
271 
272         void set(T value) {
273             this.value = value;
274         }
275 
276         void toClaimed() {
277             assert state == STATE_AVAILABLE;
278             STATE_UPDATER.lazySet(this, STATE_CLAIMED);
279         }
280 
281         void toAvailable() {
282             int prev = STATE_UPDATER.getAndSet(this, STATE_AVAILABLE);
283             if (prev == STATE_AVAILABLE) {
284                 throw new IllegalStateException("Object has been recycled already.");
285             }
286         }
287 
288         void unguardedToAvailable() {
289             int prev = state;
290             if (prev == STATE_AVAILABLE) {
291                 throw new IllegalStateException("Object has been recycled already.");
292             }
293             STATE_UPDATER.lazySet(this, STATE_AVAILABLE);
294         }
295     }
296 
297     private static final class LocalPool<T> implements MessagePassingQueue.Consumer<DefaultHandle<T>> {
298         private final int ratioInterval;
299         private final int chunkSize;
300         private final ArrayDeque<DefaultHandle<T>> batch;
301         private volatile Thread owner;
302         private volatile MessagePassingQueue<DefaultHandle<T>> pooledHandles;
303         private int ratioCounter;
304 
305         @SuppressWarnings("unchecked")
306         LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
307             this.ratioInterval = ratioInterval;
308             this.chunkSize = chunkSize;
309             batch = new ArrayDeque<DefaultHandle<T>>(chunkSize);
310             Thread currentThread = Thread.currentThread();
311             owner = !BATCH_FAST_TL_ONLY || currentThread instanceof FastThreadLocalThread ? currentThread : null;
312             if (BLOCKING_POOL) {
313                 pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
314             } else {
315                 pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
316             }
317             ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
318         }
319 
320         DefaultHandle<T> claim() {
321             MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
322             if (handles == null) {
323                 return null;
324             }
325             if (batch.isEmpty()) {
326                 handles.drain(this, chunkSize);
327             }
328             DefaultHandle<T> handle = batch.pollFirst();
329             if (null != handle) {
330                 handle.toClaimed();
331             }
332             return handle;
333         }
334 
335         void release(DefaultHandle<T> handle, boolean guarded) {
336             if (guarded) {
337                 handle.toAvailable();
338             } else {
339                 handle.unguardedToAvailable();
340             }
341             Thread owner = this.owner;
342             if (owner != null && Thread.currentThread() == owner && batch.size() < chunkSize) {
343                 accept(handle);
344             } else if (owner != null && isTerminated(owner)) {
345                 this.owner = null;
346                 pooledHandles = null;
347             } else {
348                 MessagePassingQueue<DefaultHandle<T>> handles = pooledHandles;
349                 if (handles != null) {
350                     handles.relaxedOffer(handle);
351                 }
352             }
353         }
354 
355         private static boolean isTerminated(Thread owner) {
356             // Do not use `Thread.getState()` in J9 JVM because it's known to have a performance issue.
357             // See: https://github.com/netty/netty/issues/13347#issuecomment-1518537895
358             return PlatformDependent.isJ9Jvm() ? !owner.isAlive() : owner.getState() == Thread.State.TERMINATED;
359         }
360 
361         DefaultHandle<T> newHandle() {
362             if (++ratioCounter >= ratioInterval) {
363                 ratioCounter = 0;
364                 return new DefaultHandle<T>(this);
365             }
366             return null;
367         }
368 
369         @Override
370         public void accept(DefaultHandle<T> e) {
371             batch.addLast(e);
372         }
373     }
374 
375     /**
376      * This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
377      * {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
378      * The implementation relies on synchronised monitor locks for thread-safety.
379      * The {@code fill} bulk operation is not supported by this implementation.
380      */
381     private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
382         private final Queue<T> deque;
383         private final int maxCapacity;
384 
385         BlockingMessageQueue(int maxCapacity) {
386             this.maxCapacity = maxCapacity;
387             // This message passing queue is backed by an ArrayDeque instance,
388             // made thread-safe by synchronising on `this` BlockingMessageQueue instance.
389             // Why ArrayDeque?
390             // We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient.
391             // We use ArrayDeque instead of ArrayList because we need the queue APIs.
392             // We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size().
393             // We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front,
394             // and these queues will usually have large capacities, in potentially great numbers (one per thread),
395             // but often only have comparatively few items in them.
396             deque = new ArrayDeque<T>();
397         }
398 
399         @Override
400         public synchronized boolean offer(T e) {
401             if (deque.size() == maxCapacity) {
402                 return false;
403             }
404             return deque.offer(e);
405         }
406 
407         @Override
408         public synchronized T poll() {
409             return deque.poll();
410         }
411 
412         @Override
413         public synchronized T peek() {
414             return deque.peek();
415         }
416 
417         @Override
418         public synchronized int size() {
419             return deque.size();
420         }
421 
422         @Override
423         public synchronized void clear() {
424             deque.clear();
425         }
426 
427         @Override
428         public synchronized boolean isEmpty() {
429             return deque.isEmpty();
430         }
431 
432         @Override
433         public int capacity() {
434             return maxCapacity;
435         }
436 
437         @Override
438         public boolean relaxedOffer(T e) {
439             return offer(e);
440         }
441 
442         @Override
443         public T relaxedPoll() {
444             return poll();
445         }
446 
447         @Override
448         public T relaxedPeek() {
449             return peek();
450         }
451 
452         @Override
453         public int drain(Consumer<T> c, int limit) {
454             T obj;
455             int i = 0;
456             for (; i < limit && (obj = poll()) != null; i++) {
457                 c.accept(obj);
458             }
459             return i;
460         }
461 
462         @Override
463         public int fill(Supplier<T> s, int limit) {
464             throw new UnsupportedOperationException();
465         }
466 
467         @Override
468         public int drain(Consumer<T> c) {
469             throw new UnsupportedOperationException();
470         }
471 
472         @Override
473         public int fill(Supplier<T> s) {
474             throw new UnsupportedOperationException();
475         }
476 
477         @Override
478         public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
479             throw new UnsupportedOperationException();
480         }
481 
482         @Override
483         public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
484             throw new UnsupportedOperationException();
485         }
486     }
487 }