查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.buffer;
18  
19  
20  import io.netty.buffer.PoolArena.SizeClass;
21  import io.netty.util.Recycler;
22  import io.netty.util.Recycler.Handle;
23  import io.netty.util.internal.MathUtil;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.nio.ByteBuffer;
29  import java.util.Queue;
30  
31  /**
32   * Acts a Thread cache for allocations. This implementation is moduled after
33   * <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
34   * technics of
35   * <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919">
36   * Scalable memory allocation using jemalloc</a>.
37   */
38  final class PoolThreadCache {
39  
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
41  
42      final PoolArena<byte[]> heapArena;
43      final PoolArena<ByteBuffer> directArena;
44  
45      // Hold the caches for the different size classes, which are tiny, small and normal.
46      private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
47      private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
48      private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
49      private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
50      private final MemoryRegionCache<byte[]>[] normalHeapCaches;
51      private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
52  
53      // Used for bitshifting when calculate the index of normal caches later
54      private final int numShiftsNormalDirect;
55      private final int numShiftsNormalHeap;
56      private final int freeSweepAllocationThreshold;
57  
58      private int allocations;
59  
60      // TODO: Test if adding padding helps under contention
61      //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
62  
63      PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
64                      int tinyCacheSize, int smallCacheSize, int normalCacheSize,
65                      int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
66          if (maxCachedBufferCapacity < 0) {
67              throw new IllegalArgumentException("maxCachedBufferCapacity: "
68                      + maxCachedBufferCapacity + " (expected: >= 0)");
69          }
70          this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
71          this.heapArena = heapArena;
72          this.directArena = directArena;
73          if (directArena != null) {
74              tinySubPageDirectCaches = createSubPageCaches(
75                      tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
76              smallSubPageDirectCaches = createSubPageCaches(
77                      smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
78  
79              numShiftsNormalDirect = log2(directArena.pageSize);
80              normalDirectCaches = createNormalCaches(
81                      normalCacheSize, maxCachedBufferCapacity, directArena);
82  
83              directArena.numThreadCaches.getAndIncrement();
84          } else {
85              // No directArea is configured so just null out all caches
86              tinySubPageDirectCaches = null;
87              smallSubPageDirectCaches = null;
88              normalDirectCaches = null;
89              numShiftsNormalDirect = -1;
90          }
91          if (heapArena != null) {
92              // Create the caches for the heap allocations
93              tinySubPageHeapCaches = createSubPageCaches(
94                      tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
95              smallSubPageHeapCaches = createSubPageCaches(
96                      smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
97  
98              numShiftsNormalHeap = log2(heapArena.pageSize);
99              normalHeapCaches = createNormalCaches(
100                     normalCacheSize, maxCachedBufferCapacity, heapArena);
101 
102             heapArena.numThreadCaches.getAndIncrement();
103         } else {
104             // No heapArea is configured so just null out all caches
105             tinySubPageHeapCaches = null;
106             smallSubPageHeapCaches = null;
107             normalHeapCaches = null;
108             numShiftsNormalHeap = -1;
109         }
110 
111         // Only check if there are caches in use.
112         if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
113                 || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
114                 && freeSweepAllocationThreshold < 1) {
115             throw new IllegalArgumentException("freeSweepAllocationThreshold: "
116                     + freeSweepAllocationThreshold + " (expected: > 0)");
117         }
118     }
119 
120     private static <T> MemoryRegionCache<T>[] createSubPageCaches(
121             int cacheSize, int numCaches, SizeClass sizeClass) {
122         if (cacheSize > 0 && numCaches > 0) {
123             @SuppressWarnings("unchecked")
124             MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
125             for (int i = 0; i < cache.length; i++) {
126                 // TODO: maybe use cacheSize / cache.length
127                 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
128             }
129             return cache;
130         } else {
131             return null;
132         }
133     }
134 
135     private static <T> MemoryRegionCache<T>[] createNormalCaches(
136             int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
137         if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
138             int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
139             int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
140 
141             @SuppressWarnings("unchecked")
142             MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
143             for (int i = 0; i < cache.length; i++) {
144                 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
145             }
146             return cache;
147         } else {
148             return null;
149         }
150     }
151 
152     private static int log2(int val) {
153         int res = 0;
154         while (val > 1) {
155             val >>= 1;
156             res++;
157         }
158         return res;
159     }
160 
161     /**
162      * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
163      */
164     boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
165         return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
166     }
167 
168     /**
169      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
170      */
171     boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
172         return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
173     }
174 
175     /**
176      * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
177      */
178     boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
179         return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
180     }
181 
182     @SuppressWarnings({ "unchecked", "rawtypes" })
183     private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
184         if (cache == null) {
185             // no cache found so just return false here
186             return false;
187         }
188         boolean allocated = cache.allocate(buf, reqCapacity);
189         if (++ allocations >= freeSweepAllocationThreshold) {
190             allocations = 0;
191             trim();
192         }
193         return allocated;
194     }
195 
196     /**
197      * Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
198      * Returns {@code true} if it fit into the cache {@code false} otherwise.
199      */
200     @SuppressWarnings({ "unchecked", "rawtypes" })
201     boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
202         MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
203         if (cache == null) {
204             return false;
205         }
206         return cache.add(chunk, handle);
207     }
208 
209     private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
210         switch (sizeClass) {
211         case Normal:
212             return cacheForNormal(area, normCapacity);
213         case Small:
214             return cacheForSmall(area, normCapacity);
215         case Tiny:
216             return cacheForTiny(area, normCapacity);
217         default:
218             throw new Error();
219         }
220     }
221 
222     /**
223      *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
224      */
225     void free() {
226         int numFreed = free(tinySubPageDirectCaches) +
227                 free(smallSubPageDirectCaches) +
228                 free(normalDirectCaches) +
229                 free(tinySubPageHeapCaches) +
230                 free(smallSubPageHeapCaches) +
231                 free(normalHeapCaches);
232 
233         if (numFreed > 0 && logger.isDebugEnabled()) {
234             logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName());
235         }
236 
237         if (directArena != null) {
238             directArena.numThreadCaches.getAndDecrement();
239         }
240 
241         if (heapArena != null) {
242             heapArena.numThreadCaches.getAndDecrement();
243         }
244     }
245 
246     private static int free(MemoryRegionCache<?>[] caches) {
247         if (caches == null) {
248             return 0;
249         }
250 
251         int numFreed = 0;
252         for (MemoryRegionCache<?> c: caches) {
253             numFreed += free(c);
254         }
255         return numFreed;
256     }
257 
258     private static int free(MemoryRegionCache<?> cache) {
259         if (cache == null) {
260             return 0;
261         }
262         return cache.free();
263     }
264 
265     void trim() {
266         trim(tinySubPageDirectCaches);
267         trim(smallSubPageDirectCaches);
268         trim(normalDirectCaches);
269         trim(tinySubPageHeapCaches);
270         trim(smallSubPageHeapCaches);
271         trim(normalHeapCaches);
272     }
273 
274     private static void trim(MemoryRegionCache<?>[] caches) {
275         if (caches == null) {
276             return;
277         }
278         for (MemoryRegionCache<?> c: caches) {
279             trim(c);
280         }
281     }
282 
283     private static void trim(MemoryRegionCache<?> cache) {
284         if (cache == null) {
285             return;
286         }
287         cache.trim();
288     }
289 
290     private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
291         int idx = PoolArena.tinyIdx(normCapacity);
292         if (area.isDirect()) {
293             return cache(tinySubPageDirectCaches, idx);
294         }
295         return cache(tinySubPageHeapCaches, idx);
296     }
297 
298     private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
299         int idx = PoolArena.smallIdx(normCapacity);
300         if (area.isDirect()) {
301             return cache(smallSubPageDirectCaches, idx);
302         }
303         return cache(smallSubPageHeapCaches, idx);
304     }
305 
306     private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
307         if (area.isDirect()) {
308             int idx = log2(normCapacity >> numShiftsNormalDirect);
309             return cache(normalDirectCaches, idx);
310         }
311         int idx = log2(normCapacity >> numShiftsNormalHeap);
312         return cache(normalHeapCaches, idx);
313     }
314 
315     private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
316         if (cache == null || idx > cache.length - 1) {
317             return null;
318         }
319         return cache[idx];
320     }
321 
322     /**
323      * Cache used for buffers which are backed by TINY or SMALL size.
324      */
325     private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
326         SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
327             super(size, sizeClass);
328         }
329 
330         @Override
331         protected void initBuf(
332                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
333             chunk.initBufWithSubpage(buf, handle, reqCapacity);
334         }
335     }
336 
337     /**
338      * Cache used for buffers which are backed by NORMAL size.
339      */
340     private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
341         NormalMemoryRegionCache(int size) {
342             super(size, SizeClass.Normal);
343         }
344 
345         @Override
346         protected void initBuf(
347                 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
348             chunk.initBuf(buf, handle, reqCapacity);
349         }
350     }
351 
352     private abstract static class MemoryRegionCache<T> {
353         private final int size;
354         private final Queue<Entry<T>> queue;
355         private final SizeClass sizeClass;
356         private int allocations;
357 
358         MemoryRegionCache(int size, SizeClass sizeClass) {
359             this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
360             queue = PlatformDependent.newFixedMpscQueue(this.size);
361             this.sizeClass = sizeClass;
362         }
363 
364         /**
365          * Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
366          */
367         protected abstract void initBuf(PoolChunk<T> chunk, long handle,
368                                         PooledByteBuf<T> buf, int reqCapacity);
369 
370         /**
371          * Add to cache if not already full.
372          */
373         @SuppressWarnings("unchecked")
374         public final boolean add(PoolChunk<T> chunk, long handle) {
375             Entry<T> entry = newEntry(chunk, handle);
376             boolean queued = queue.offer(entry);
377             if (!queued) {
378                 // If it was not possible to cache the chunk, immediately recycle the entry
379                 entry.recycle();
380             }
381 
382             return queued;
383         }
384 
385         /**
386          * Allocate something out of the cache if possible and remove the entry from the cache.
387          */
388         public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
389             Entry<T> entry = queue.poll();
390             if (entry == null) {
391                 return false;
392             }
393             initBuf(entry.chunk, entry.handle, buf, reqCapacity);
394             entry.recycle();
395 
396             // allocations is not thread-safe which is fine as this is only called from the same thread all time.
397             ++ allocations;
398             return true;
399         }
400 
401         /**
402          * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
403          */
404         public final int free() {
405             return free(Integer.MAX_VALUE);
406         }
407 
408         private int free(int max) {
409             int numFreed = 0;
410             for (; numFreed < max; numFreed++) {
411                 Entry<T> entry = queue.poll();
412                 if (entry != null) {
413                     freeEntry(entry);
414                 } else {
415                     // all cleared
416                     return numFreed;
417                 }
418             }
419             return numFreed;
420         }
421 
422         /**
423          * Free up cached {@link PoolChunk}s if not allocated frequently enough.
424          */
425         public final void trim() {
426             int free = size - allocations;
427             allocations = 0;
428 
429             // We not even allocated all the number that are
430             if (free > 0) {
431                 free(free);
432             }
433         }
434 
435         @SuppressWarnings({ "unchecked", "rawtypes" })
436         private  void freeEntry(Entry entry) {
437             PoolChunk chunk = entry.chunk;
438             long handle = entry.handle;
439 
440             // recycle now so PoolChunk can be GC'ed.
441             entry.recycle();
442 
443             chunk.arena.freeChunk(chunk, handle, sizeClass);
444         }
445 
446         static final class Entry<T> {
447             final Handle recyclerHandle;
448             PoolChunk<T> chunk;
449             long handle = -1;
450 
451             Entry(Handle recyclerHandle) {
452                 this.recyclerHandle = recyclerHandle;
453             }
454 
455             void recycle() {
456                 chunk = null;
457                 handle = -1;
458                 RECYCLER.recycle(this, recyclerHandle);
459             }
460         }
461 
462         @SuppressWarnings("rawtypes")
463         private static Entry newEntry(PoolChunk<?> chunk, long handle) {
464             Entry entry = RECYCLER.get();
465             entry.chunk = chunk;
466             entry.handle = handle;
467             return entry;
468         }
469 
470         @SuppressWarnings("rawtypes")
471         private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
472             @Override
473             protected Entry newObject(Handle handle) {
474                 return new Entry(handle);
475             }
476         };
477     }
478 }