1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.buffer;
18
19
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import io.netty.buffer.PoolArena.SizeClass;
23 import io.netty.util.Recycler.EnhancedHandle;
24 import io.netty.util.internal.MathUtil;
25 import io.netty.util.internal.ObjectPool;
26 import io.netty.util.internal.ObjectPool.Handle;
27 import io.netty.util.internal.ObjectPool.ObjectCreator;
28 import io.netty.util.internal.PlatformDependent;
29 import io.netty.util.internal.logging.InternalLogger;
30 import io.netty.util.internal.logging.InternalLoggerFactory;
31
32 import java.nio.ByteBuffer;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Queue;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38
39
40
41
42
43
44
45 final class PoolThreadCache {
46
47 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
48 private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
49
50 final PoolArena<byte[]> heapArena;
51 final PoolArena<ByteBuffer> directArena;
52
53
54 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
55 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
56 private final MemoryRegionCache<byte[]>[] normalHeapCaches;
57 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
58
59 private final int freeSweepAllocationThreshold;
60 private final AtomicBoolean freed = new AtomicBoolean();
61 @SuppressWarnings("unused")
62 private final FreeOnFinalize freeOnFinalize;
63
64 private int allocations;
65
66
67
68
69 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
70 int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
71 int freeSweepAllocationThreshold, boolean useFinalizer) {
72 checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
73 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
74 this.heapArena = heapArena;
75 this.directArena = directArena;
76 if (directArena != null) {
77 smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);
78 normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
79 directArena.numThreadCaches.getAndIncrement();
80 } else {
81
82 smallSubPageDirectCaches = null;
83 normalDirectCaches = null;
84 }
85 if (heapArena != null) {
86
87 smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);
88 normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
89 heapArena.numThreadCaches.getAndIncrement();
90 } else {
91
92 smallSubPageHeapCaches = null;
93 normalHeapCaches = null;
94 }
95
96
97 if ((smallSubPageDirectCaches != null || normalDirectCaches != null
98 || smallSubPageHeapCaches != null || normalHeapCaches != null)
99 && freeSweepAllocationThreshold < 1) {
100 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
101 + freeSweepAllocationThreshold + " (expected: > 0)");
102 }
103 freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;
104 }
105
106 private static <T> MemoryRegionCache<T>[] createSubPageCaches(
107 int cacheSize, int numCaches) {
108 if (cacheSize > 0 && numCaches > 0) {
109 @SuppressWarnings("unchecked")
110 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
111 for (int i = 0; i < cache.length; i++) {
112
113 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
114 }
115 return cache;
116 } else {
117 return null;
118 }
119 }
120
121 @SuppressWarnings("unchecked")
122 private static <T> MemoryRegionCache<T>[] createNormalCaches(
123 int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
124 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
125 int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);
126
127
128 List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
129 for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&
130 area.sizeClass.sizeIdx2size(idx) <= max; idx++) {
131 cache.add(new NormalMemoryRegionCache<T>(cacheSize));
132 }
133 return cache.toArray(new MemoryRegionCache[0]);
134 } else {
135 return null;
136 }
137 }
138
139
140 static int log2(int val) {
141 return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
142 }
143
144
145
146
147 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
148 return allocate(cacheForSmall(area, sizeIdx), buf, reqCapacity);
149 }
150
151
152
153
154 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
155 return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
156 }
157
158 @SuppressWarnings({ "unchecked", "rawtypes" })
159 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
160 if (cache == null) {
161
162 return false;
163 }
164 boolean allocated = cache.allocate(buf, reqCapacity, this);
165 if (++ allocations >= freeSweepAllocationThreshold) {
166 allocations = 0;
167 trim();
168 }
169 return allocated;
170 }
171
172
173
174
175
176 @SuppressWarnings({ "unchecked", "rawtypes" })
177 boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
178 long handle, int normCapacity, SizeClass sizeClass) {
179 int sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);
180 MemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);
181 if (cache == null) {
182 return false;
183 }
184 if (freed.get()) {
185 return false;
186 }
187 return cache.add(chunk, nioBuffer, handle, normCapacity);
188 }
189
190 private MemoryRegionCache<?> cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass) {
191 switch (sizeClass) {
192 case Normal:
193 return cacheForNormal(area, sizeIdx);
194 case Small:
195 return cacheForSmall(area, sizeIdx);
196 default:
197 throw new Error();
198 }
199 }
200
201
202
203
204 void free(boolean finalizer) {
205
206
207 if (freed.compareAndSet(false, true)) {
208 int numFreed = free(smallSubPageDirectCaches, finalizer) +
209 free(normalDirectCaches, finalizer) +
210 free(smallSubPageHeapCaches, finalizer) +
211 free(normalHeapCaches, finalizer);
212
213 if (numFreed > 0 && logger.isDebugEnabled()) {
214 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,
215 Thread.currentThread().getName());
216 }
217
218 if (directArena != null) {
219 directArena.numThreadCaches.getAndDecrement();
220 }
221
222 if (heapArena != null) {
223 heapArena.numThreadCaches.getAndDecrement();
224 }
225 } else {
226
227 checkCacheMayLeak(smallSubPageDirectCaches, "SmallSubPageDirectCaches");
228 checkCacheMayLeak(normalDirectCaches, "NormalDirectCaches");
229 checkCacheMayLeak(smallSubPageHeapCaches, "SmallSubPageHeapCaches");
230 checkCacheMayLeak(normalHeapCaches, "NormalHeapCaches");
231 }
232 }
233
234 private static void checkCacheMayLeak(MemoryRegionCache<?>[] caches, String type) {
235 for (MemoryRegionCache<?> cache : caches) {
236 if (!cache.queue.isEmpty()) {
237 logger.debug("{} memory may leak.", type);
238 return;
239 }
240 }
241 }
242
243 private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {
244 if (caches == null) {
245 return 0;
246 }
247
248 int numFreed = 0;
249 for (MemoryRegionCache<?> c: caches) {
250 numFreed += free(c, finalizer);
251 }
252 return numFreed;
253 }
254
255 private static int free(MemoryRegionCache<?> cache, boolean finalizer) {
256 if (cache == null) {
257 return 0;
258 }
259 return cache.free(finalizer);
260 }
261
262 void trim() {
263 trim(smallSubPageDirectCaches);
264 trim(normalDirectCaches);
265 trim(smallSubPageHeapCaches);
266 trim(normalHeapCaches);
267 }
268
269 private static void trim(MemoryRegionCache<?>[] caches) {
270 if (caches == null) {
271 return;
272 }
273 for (MemoryRegionCache<?> c: caches) {
274 trim(c);
275 }
276 }
277
278 private static void trim(MemoryRegionCache<?> cache) {
279 if (cache == null) {
280 return;
281 }
282 cache.trim();
283 }
284
285 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int sizeIdx) {
286 if (area.isDirect()) {
287 return cache(smallSubPageDirectCaches, sizeIdx);
288 }
289 return cache(smallSubPageHeapCaches, sizeIdx);
290 }
291
292 private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
293
294 int idx = sizeIdx - area.sizeClass.nSubpages;
295 if (area.isDirect()) {
296 return cache(normalDirectCaches, idx);
297 }
298 return cache(normalHeapCaches, idx);
299 }
300
301 private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
302 if (cache == null || sizeIdx > cache.length - 1) {
303 return null;
304 }
305 return cache[sizeIdx];
306 }
307
308
309
310
311 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
312 SubPageMemoryRegionCache(int size) {
313 super(size, SizeClass.Small);
314 }
315
316 @Override
317 protected void initBuf(
318 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
319 PoolThreadCache threadCache) {
320 chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
321 }
322 }
323
324
325
326
327 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
328 NormalMemoryRegionCache(int size) {
329 super(size, SizeClass.Normal);
330 }
331
332 @Override
333 protected void initBuf(
334 PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity,
335 PoolThreadCache threadCache) {
336 chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
337 }
338 }
339
340 private abstract static class MemoryRegionCache<T> {
341 private final int size;
342 private final Queue<Entry<T>> queue;
343 private final SizeClass sizeClass;
344 private int allocations;
345
346 MemoryRegionCache(int size, SizeClass sizeClass) {
347 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
348 queue = PlatformDependent.newFixedMpscQueue(this.size);
349 this.sizeClass = sizeClass;
350 }
351
352
353
354
355 protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
356 PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache);
357
358
359
360
361 @SuppressWarnings("unchecked")
362 public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
363 Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
364 boolean queued = queue.offer(entry);
365 if (!queued) {
366
367 entry.unguardedRecycle();
368 }
369
370 return queued;
371 }
372
373
374
375
376 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity, PoolThreadCache threadCache) {
377 Entry<T> entry = queue.poll();
378 if (entry == null) {
379 return false;
380 }
381 initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
382 entry.unguardedRecycle();
383
384
385 ++ allocations;
386 return true;
387 }
388
389
390
391
392 public final int free(boolean finalizer) {
393 return free(Integer.MAX_VALUE, finalizer);
394 }
395
396 private int free(int max, boolean finalizer) {
397 int numFreed = 0;
398 for (; numFreed < max; numFreed++) {
399 Entry<T> entry = queue.poll();
400 if (entry != null) {
401 freeEntry(entry, finalizer);
402 } else {
403
404 return numFreed;
405 }
406 }
407 return numFreed;
408 }
409
410
411
412
413 public final void trim() {
414 int free = size - allocations;
415 allocations = 0;
416
417
418 if (free > 0) {
419 free(free, false);
420 }
421 }
422
423 @SuppressWarnings({ "unchecked", "rawtypes" })
424 private void freeEntry(Entry entry, boolean finalizer) {
425
426 PoolChunk chunk = entry.chunk;
427 long handle = entry.handle;
428 ByteBuffer nioBuffer = entry.nioBuffer;
429 int normCapacity = entry.normCapacity;
430
431 if (!finalizer) {
432
433
434 entry.recycle();
435 }
436
437 chunk.arena.freeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, finalizer);
438 }
439
440 static final class Entry<T> {
441 final EnhancedHandle<Entry<?>> recyclerHandle;
442 PoolChunk<T> chunk;
443 ByteBuffer nioBuffer;
444 long handle = -1;
445 int normCapacity;
446
447 Entry(Handle<Entry<?>> recyclerHandle) {
448 this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;
449 }
450
451 void recycle() {
452 chunk = null;
453 nioBuffer = null;
454 handle = -1;
455 recyclerHandle.recycle(this);
456 }
457
458 void unguardedRecycle() {
459 chunk = null;
460 nioBuffer = null;
461 handle = -1;
462 recyclerHandle.unguardedRecycle(this);
463 }
464 }
465
466 @SuppressWarnings("rawtypes")
467 private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
468 Entry entry = RECYCLER.get();
469 entry.chunk = chunk;
470 entry.nioBuffer = nioBuffer;
471 entry.handle = handle;
472 entry.normCapacity = normCapacity;
473 return entry;
474 }
475
476 @SuppressWarnings("rawtypes")
477 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
478 @SuppressWarnings("unchecked")
479 @Override
480 public Entry newObject(Handle<Entry> handle) {
481 return new Entry(handle);
482 }
483 });
484 }
485
486 private static final class FreeOnFinalize {
487 private final PoolThreadCache cache;
488
489 private FreeOnFinalize(PoolThreadCache cache) {
490 this.cache = cache;
491 }
492
493
494 @SuppressWarnings({"FinalizeDeclaration", "deprecation"})
495 @Override
496 protected void finalize() throws Throwable {
497 try {
498 super.finalize();
499 } finally {
500 cache.free(true);
501 }
502 }
503 }
504 }