查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21  
22  import io.netty.buffer.PoolArena.SizeClass;
23  import io.netty.util.Recycler.EnhancedHandle;
24  import io.netty.util.internal.MathUtil;
25  import io.netty.util.internal.ObjectPool;
26  import io.netty.util.internal.ObjectPool.Handle;
27  import io.netty.util.internal.ObjectPool.ObjectCreator;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.nio.ByteBuffer;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Queue;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  /**
39   * Acts a Thread cache for allocations. This implementation is moduled after
40   * <a href="https://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
41   * technics of
42   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
43   * Scalable memory allocation using jemalloc</a>.
44   */
45  final class PoolThreadCache {
46  
47      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
48      private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
49  
50      final PoolArena<byte[]> heapArena;
51      final PoolArena<ByteBuffer> directArena;
52  
53      // Hold the caches for the different size classes, which are small and normal.
54      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
55      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
56      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
57      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
58  
59      private final int freeSweepAllocationThreshold;
60      private final AtomicBoolean freed = new AtomicBoolean();
61      @SuppressWarnings("unused") // Field is only here for the finalizer.
62      private final FreeOnFinalize freeOnFinalize;
63  
64      private int allocations;
65  
66      // TODO: Test if adding padding helps under contention
67      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
68  
69      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
70                      int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
71                      int freeSweepAllocationThreshold, boolean useFinalizer) {
72          checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
73          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
74          this.heapArena = heapArena;
75          this.directArena = directArena;
76          if (directArena != null) {
77              smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
78              normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
79              directArena.numThreadCaches.getAndIncrement();
80          } else {
81              // No directArea is configured so just null out all caches
82              smallSubPageDirectCaches = null;
83              normalDirectCaches = null;
84          }
85          if (heapArena != null) {
86              // Create the caches for the heap allocations
87              smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
88              normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
89              heapArena.numThreadCaches.getAndIncrement();
90          } else {
91              // No heapArea is configured so just null out all caches
92              smallSubPageHeapCaches = null;
93              normalHeapCaches = null;
94          }
95  
96          // Only check if there are caches in use.
97          if ((smallSubPageDirectCaches != null || normalDirectCaches != null
98                  || smallSubPageHeapCaches != null || normalHeapCaches != null)
99                  && freeSweepAllocationThreshold < 1) {
100             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
101                     + freeSweepAllocationThreshold + " (expected: > 0)");
102         }
103         freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
104     }
105 
106     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
107             int cacheSize, int numCaches) {
108         if (cacheSize > 0 && numCaches > 0) {
109             @SuppressWarnings("unchecked")
110             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
111             for (int i = 0; i < cache.length; i++) {
112                 // TODO: maybe use cacheSize / cache.length
113                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
114             }
115             return cache;
116         } else {
117             return null;
118         }
119     }
120 
121     @SuppressWarnings("unchecked")
122     private static <T> MemoryRegionCache<T>[] createNormalCaches(
123             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
124         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
125             int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
126             // Create as many normal caches as we support based on how many sizeIdx we have and what the upper
127             // bound is that we want to cache in general.
128             List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
129             for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
130                     area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
131                 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
132             }
133             return cache.toArray(new MemoryRegionCache[0]);
134         } else {
135             return null;
136         }
137     }
138 
139     // val > 0
140     static int log2(int val) {
141         return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
142     }
143 
144     /**
145      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
146      */
147     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
148         return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
149     }
150 
151     /**
152      * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
153      */
154     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
155         return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
156     }
157 
158     @SuppressWarnings({ "unchecked", "rawtypes" })
159     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
160         if (cache == null) {
161             // no cache found so just return false here
162             return false;
163         }
164         boolean allocated = cache.allocate(buf, reqCapacity, this);
165         if (++ allocations >= freeSweepAllocationThreshold) {
166             allocations = 0;
167             trim();
168         }
169         return allocated;
170     }
171 
172     /**
173      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
174      * Returns {@code true} if it fit into the cache {@code false} otherwise.
175      */
176     @SuppressWarnings({ "unchecked", "rawtypes" })
177     boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
178                 long handle, int normCapacity, SizeClass sizeClass) {
179         int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
180         MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
181         if (cache == null) {
182             return false;
183         }
184         if (freed.get()) {
185             return false;
186         }
187         return cache.add(chunk, nioBuffer, handle, normCapacity);
188     }
189 
190     private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
191         switch (sizeClass) {
192         case Normal:
193             return cacheForNormal(area, sizeIdx);
194         case Small:
195             return cacheForSmall(area, sizeIdx);
196         default:
197             throw new Error();
198         }
199     }
200 
201     /**
202      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
203      */
204     void free(boolean finalizer) {
205         // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
206         // we only call this one time.
207         if (freed.compareAndSet(false, true)) {
208             int numFreed = free(smallSubPageDirectCaches, finalizer) +
209                     free(normalDirectCaches, finalizer) +
210                     free(smallSubPageHeapCaches, finalizer) +
211                     free(normalHeapCaches, finalizer);
212 
213             if (numFreed > 0 && logger.isDebugEnabled()) {
214                 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
215                         Thread.currentThread().getName());
216             }
217 
218             if (directArena != null) {
219                 directArena.numThreadCaches.getAndDecrement();
220             }
221 
222             if (heapArena != null) {
223                 heapArena.numThreadCaches.getAndDecrement();
224             }
225         } else {
226             // See https://github.com/netty/netty/issues/12749
227             checkCacheMayLeak(smallSubPageDirectCaches, "SmallSubPageDirectCaches");
228             checkCacheMayLeak(normalDirectCaches, "NormalDirectCaches");
229             checkCacheMayLeak(smallSubPageHeapCaches, "SmallSubPageHeapCaches");
230             checkCacheMayLeak(normalHeapCaches, "NormalHeapCaches");
231         }
232     }
233 
234     private static void checkCacheMayLeak(MemoryRegionCache<?>[] caches, String type) {
235         for (MemoryRegionCache<?> cache : caches) {
236             if (!cache.queue.isEmpty()) {
237                 logger.debug("{} memory may leak.", type);
238                 return;
239             }
240         }
241     }
242 
243     private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
244         if (caches == null) {
245             return 0;
246         }
247 
248         int numFreed = 0;
249         for (MemoryRegionCache<?> c: caches) {
250             numFreed += free(c, finalizer);
251         }
252         return numFreed;
253     }
254 
255     private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
256         if (cache == null) {
257             return 0;
258         }
259         return cache.free(finalizer);
260     }
261 
262     void trim() {
263         trim(smallSubPageDirectCaches);
264         trim(normalDirectCaches);
265         trim(smallSubPageHeapCaches);
266         trim(normalHeapCaches);
267     }
268 
269     private static void trim(MemoryRegionCache<?>[] caches) {
270         if (caches == null) {
271             return;
272         }
273         for (MemoryRegionCache<?> c: caches) {
274             trim(c);
275         }
276     }
277 
278     private static void trim(MemoryRegionCache<?> cache) {
279         if (cache == null) {
280             return;
281         }
282         cache.trim();
283     }
284 
285     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
286         if (area.isDirect()) {
287             return cache(smallSubPageDirectCaches, sizeIdx);
288         }
289         return cache(smallSubPageHeapCaches, sizeIdx);
290     }
291 
292     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
293         // We need to subtract area.sizeClass.nSubpages as sizeIdx is the overall index for all sizes.
294         int idx = sizeIdx - area.sizeClass.nSubpages;
295         if (area.isDirect()) {
296             return cache(normalDirectCaches, idx);
297         }
298         return cache(normalHeapCaches, idx);
299     }
300 
301     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
302         if (cache == null || sizeIdx > cache.length - 1) {
303             return null;
304         }
305         return cache[sizeIdx];
306     }
307 
308     /**
309      * Cache used for buffers which are backed by TINY or SMALL size.
310      */
311     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
312         SubPageMemoryRegionCache(int size) {
313             super(size, SizeClass.Small);
314         }
315 
316         @Override
317         protected void initBuf(
318                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
319                 PoolThreadCache threadCache) {
320             chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
321         }
322     }
323 
324     /**
325      * Cache used for buffers which are backed by NORMAL size.
326      */
327     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
328         NormalMemoryRegionCache(int size) {
329             super(size, SizeClass.Normal);
330         }
331 
332         @Override
333         protected void initBuf(
334                 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
335                 PoolThreadCache threadCache) {
336             chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
337         }
338     }
339 
340     private abstract static class MemoryRegionCache<T> {
341         private final int size;
342         private final Queue<Entry<T>> queue;
343         private final SizeClass sizeClass;
344         private int allocations;
345 
346         MemoryRegionCache(int size, SizeClass sizeClass) {
347             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
348             queue = PlatformDependent.newFixedMpscQueue(this.size);
349             this.sizeClass = sizeClass;
350         }
351 
352         /**
353          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
354          */
355         protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
356                                         PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
357 
358         /**
359          * Add to cache if not already full.
360          */
361         @SuppressWarnings("unchecked")
362         public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
363             Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
364             boolean queued = queue.offer(entry);
365             if (!queued) {
366                 // If it was not possible to cache the chunk, immediately recycle the entry
367                 entry.unguardedRecycle();
368             }
369 
370             return queued;
371         }
372 
373         /**
374          * Allocate something out of the cache if possible and remove the entry from the cache.
375          */
376         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
377             Entry<T> entry = queue.poll();
378             if (entry == null) {
379                 return false;
380             }
381             initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
382             entry.unguardedRecycle();
383 
384             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
385             ++ allocations;
386             return true;
387         }
388 
389         /**
390          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
391          */
392         public final int free(boolean finalizer) {
393             return free(Integer.MAX_VALUE, finalizer);
394         }
395 
396         private int free(int max, boolean finalizer) {
397             int numFreed = 0;
398             for (; numFreed < max; numFreed++) {
399                 Entry<T> entry = queue.poll();
400                 if (entry != null) {
401                     freeEntry(entry, finalizer);
402                 } else {
403                     // all cleared
404                     return numFreed;
405                 }
406             }
407             return numFreed;
408         }
409 
410         /**
411          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
412          */
413         public final void trim() {
414             int free = size - allocations;
415             allocations = 0;
416 
417             // We not even allocated all the number that are
418             if (free > 0) {
419                 free(free, false);
420             }
421         }
422 
423         @SuppressWarnings({ "unchecked", "rawtypes" })
424         private  void freeEntry(Entry entry, boolean finalizer) {
425             // Capture entry state before we recycle the entry object.
426             PoolChunk chunk = entry.chunk;
427             long handle = entry.handle;
428             ByteBuffer nioBuffer = entry.nioBuffer;
429             int normCapacity = entry.normCapacity;
430 
431             if (!finalizer) {
432                 // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
433                 // a finalizer.
434                 entry.recycle();
435             }
436 
437             chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
438         }
439 
440         static final class Entry<T> {
441             final EnhancedHandle<Entry<?>> recyclerHandle;
442             PoolChunk<T> chunk;
443             ByteBuffer nioBuffer;
444             long handle = -1;
445             int normCapacity;
446 
447             Entry(Handle<Entry<?>> recyclerHandle) {
448                 this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
449             }
450 
451             void recycle() {
452                 chunk = null;
453                 nioBuffer = null;
454                 handle = -1;
455                 recyclerHandle.recycle(this);
456             }
457 
458             void unguardedRecycle() {
459                 chunk = null;
460                 nioBuffer = null;
461                 handle = -1;
462                 recyclerHandle.unguardedRecycle(this);
463             }
464         }
465 
466         @SuppressWarnings("rawtypes")
467         private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
468             Entry entry = RECYCLER.get();
469             entry.chunk = chunk;
470             entry.nioBuffer = nioBuffer;
471             entry.handle = handle;
472             entry.normCapacity = normCapacity;
473             return entry;
474         }
475 
476         @SuppressWarnings("rawtypes")
477         private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
478             @SuppressWarnings("unchecked")
479             @Override
480             public Entry newObject(Handle<Entry> handle) {
481                 return new Entry(handle);
482             }
483         });
484     }
485 
486     private static final class FreeOnFinalize {
487         private final PoolThreadCache cache;
488 
489         private FreeOnFinalize(PoolThreadCache cache) {
490             this.cache = cache;
491         }
492 
493         /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
494         @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
495         @Override
496         protected void finalize() throws Throwable {
497             try {
498                 super.finalize();
499             } finally {
500                 cache.free(true);
501             }
502         }
503     }
504 }