1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.buffer;
18
19
20 import io.netty.buffer.PoolArena.SizeClass;
21 import io.netty.util.Recycler;
22 import io.netty.util.Recycler.Handle;
23 import io.netty.util.internal.MathUtil;
24 import io.netty.util.internal.PlatformDependent;
25 import io.netty.util.internal.logging.InternalLogger;
26 import io.netty.util.internal.logging.InternalLoggerFactory;
27
28 import java.nio.ByteBuffer;
29 import java.util.Queue;
30
31
32
33
34
35
36
37
38 final class PoolThreadCache {
39
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
41
42 final PoolArena<byte[]> heapArena;
43 final PoolArena<ByteBuffer> directArena;
44
45
46 private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
47 private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
48 private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
49 private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
50 private final MemoryRegionCache<byte[]>[] normalHeapCaches;
51 private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
52
53
54 private final int numShiftsNormalDirect;
55 private final int numShiftsNormalHeap;
56 private final int freeSweepAllocationThreshold;
57
58 private int allocations;
59
60
61
62
63 PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
64 int tinyCacheSize, int smallCacheSize, int normalCacheSize,
65 int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
66 if (maxCachedBufferCapacity < 0) {
67 throw new IllegalArgumentException("maxCachedBufferCapacity: "
68 + maxCachedBufferCapacity + " (expected: >= 0)");
69 }
70 this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
71 this.heapArena = heapArena;
72 this.directArena = directArena;
73 if (directArena != null) {
74 tinySubPageDirectCaches = createSubPageCaches(
75 tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
76 smallSubPageDirectCaches = createSubPageCaches(
77 smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
78
79 numShiftsNormalDirect = log2(directArena.pageSize);
80 normalDirectCaches = createNormalCaches(
81 normalCacheSize, maxCachedBufferCapacity, directArena);
82
83 directArena.numThreadCaches.getAndIncrement();
84 } else {
85
86 tinySubPageDirectCaches = null;
87 smallSubPageDirectCaches = null;
88 normalDirectCaches = null;
89 numShiftsNormalDirect = -1;
90 }
91 if (heapArena != null) {
92
93 tinySubPageHeapCaches = createSubPageCaches(
94 tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
95 smallSubPageHeapCaches = createSubPageCaches(
96 smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
97
98 numShiftsNormalHeap = log2(heapArena.pageSize);
99 normalHeapCaches = createNormalCaches(
100 normalCacheSize, maxCachedBufferCapacity, heapArena);
101
102 heapArena.numThreadCaches.getAndIncrement();
103 } else {
104
105 tinySubPageHeapCaches = null;
106 smallSubPageHeapCaches = null;
107 normalHeapCaches = null;
108 numShiftsNormalHeap = -1;
109 }
110
111
112 if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
113 || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
114 && freeSweepAllocationThreshold < 1) {
115 throw new IllegalArgumentException("freeSweepAllocationThreshold: "
116 + freeSweepAllocationThreshold + " (expected: > 0)");
117 }
118 }
119
120 private static <T> MemoryRegionCache<T>[] createSubPageCaches(
121 int cacheSize, int numCaches, SizeClass sizeClass) {
122 if (cacheSize > 0 && numCaches > 0) {
123 @SuppressWarnings("unchecked")
124 MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
125 for (int i = 0; i < cache.length; i++) {
126
127 cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
128 }
129 return cache;
130 } else {
131 return null;
132 }
133 }
134
135 private static <T> MemoryRegionCache<T>[] createNormalCaches(
136 int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
137 if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
138 int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
139 int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
140
141 @SuppressWarnings("unchecked")
142 MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
143 for (int i = 0; i < cache.length; i++) {
144 cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
145 }
146 return cache;
147 } else {
148 return null;
149 }
150 }
151
152 private static int log2(int val) {
153 int res = 0;
154 while (val > 1) {
155 val >>= 1;
156 res++;
157 }
158 return res;
159 }
160
161
162
163
164 boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
165 return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
166 }
167
168
169
170
171 boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
172 return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
173 }
174
175
176
177
178 boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
179 return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
180 }
181
182 @SuppressWarnings({ "unchecked", "rawtypes" })
183 private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
184 if (cache == null) {
185
186 return false;
187 }
188 boolean allocated = cache.allocate(buf, reqCapacity);
189 if (++ allocations >= freeSweepAllocationThreshold) {
190 allocations = 0;
191 trim();
192 }
193 return allocated;
194 }
195
196
197
198
199
200 @SuppressWarnings({ "unchecked", "rawtypes" })
201 boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
202 MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
203 if (cache == null) {
204 return false;
205 }
206 return cache.add(chunk, handle);
207 }
208
209 private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
210 switch (sizeClass) {
211 case Normal:
212 return cacheForNormal(area, normCapacity);
213 case Small:
214 return cacheForSmall(area, normCapacity);
215 case Tiny:
216 return cacheForTiny(area, normCapacity);
217 default:
218 throw new Error();
219 }
220 }
221
222
223
224
225 void free() {
226 int numFreed = free(tinySubPageDirectCaches) +
227 free(smallSubPageDirectCaches) +
228 free(normalDirectCaches) +
229 free(tinySubPageHeapCaches) +
230 free(smallSubPageHeapCaches) +
231 free(normalHeapCaches);
232
233 if (numFreed > 0 && logger.isDebugEnabled()) {
234 logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, Thread.currentThread().getName());
235 }
236
237 if (directArena != null) {
238 directArena.numThreadCaches.getAndDecrement();
239 }
240
241 if (heapArena != null) {
242 heapArena.numThreadCaches.getAndDecrement();
243 }
244 }
245
246 private static int free(MemoryRegionCache<?>[] caches) {
247 if (caches == null) {
248 return 0;
249 }
250
251 int numFreed = 0;
252 for (MemoryRegionCache<?> c: caches) {
253 numFreed += free(c);
254 }
255 return numFreed;
256 }
257
258 private static int free(MemoryRegionCache<?> cache) {
259 if (cache == null) {
260 return 0;
261 }
262 return cache.free();
263 }
264
265 void trim() {
266 trim(tinySubPageDirectCaches);
267 trim(smallSubPageDirectCaches);
268 trim(normalDirectCaches);
269 trim(tinySubPageHeapCaches);
270 trim(smallSubPageHeapCaches);
271 trim(normalHeapCaches);
272 }
273
274 private static void trim(MemoryRegionCache<?>[] caches) {
275 if (caches == null) {
276 return;
277 }
278 for (MemoryRegionCache<?> c: caches) {
279 trim(c);
280 }
281 }
282
283 private static void trim(MemoryRegionCache<?> cache) {
284 if (cache == null) {
285 return;
286 }
287 cache.trim();
288 }
289
290 private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
291 int idx = PoolArena.tinyIdx(normCapacity);
292 if (area.isDirect()) {
293 return cache(tinySubPageDirectCaches, idx);
294 }
295 return cache(tinySubPageHeapCaches, idx);
296 }
297
298 private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
299 int idx = PoolArena.smallIdx(normCapacity);
300 if (area.isDirect()) {
301 return cache(smallSubPageDirectCaches, idx);
302 }
303 return cache(smallSubPageHeapCaches, idx);
304 }
305
306 private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
307 if (area.isDirect()) {
308 int idx = log2(normCapacity >> numShiftsNormalDirect);
309 return cache(normalDirectCaches, idx);
310 }
311 int idx = log2(normCapacity >> numShiftsNormalHeap);
312 return cache(normalHeapCaches, idx);
313 }
314
315 private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
316 if (cache == null || idx > cache.length - 1) {
317 return null;
318 }
319 return cache[idx];
320 }
321
322
323
324
325 private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
326 SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
327 super(size, sizeClass);
328 }
329
330 @Override
331 protected void initBuf(
332 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
333 chunk.initBufWithSubpage(buf, handle, reqCapacity);
334 }
335 }
336
337
338
339
340 private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
341 NormalMemoryRegionCache(int size) {
342 super(size, SizeClass.Normal);
343 }
344
345 @Override
346 protected void initBuf(
347 PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
348 chunk.initBuf(buf, handle, reqCapacity);
349 }
350 }
351
352 private abstract static class MemoryRegionCache<T> {
353 private final int size;
354 private final Queue<Entry<T>> queue;
355 private final SizeClass sizeClass;
356 private int allocations;
357
358 MemoryRegionCache(int size, SizeClass sizeClass) {
359 this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
360 queue = PlatformDependent.newFixedMpscQueue(this.size);
361 this.sizeClass = sizeClass;
362 }
363
364
365
366
367 protected abstract void initBuf(PoolChunk<T> chunk, long handle,
368 PooledByteBuf<T> buf, int reqCapacity);
369
370
371
372
373 @SuppressWarnings("unchecked")
374 public final boolean add(PoolChunk<T> chunk, long handle) {
375 Entry<T> entry = newEntry(chunk, handle);
376 boolean queued = queue.offer(entry);
377 if (!queued) {
378
379 entry.recycle();
380 }
381
382 return queued;
383 }
384
385
386
387
388 public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
389 Entry<T> entry = queue.poll();
390 if (entry == null) {
391 return false;
392 }
393 initBuf(entry.chunk, entry.handle, buf, reqCapacity);
394 entry.recycle();
395
396
397 ++ allocations;
398 return true;
399 }
400
401
402
403
404 public final int free() {
405 return free(Integer.MAX_VALUE);
406 }
407
408 private int free(int max) {
409 int numFreed = 0;
410 for (; numFreed < max; numFreed++) {
411 Entry<T> entry = queue.poll();
412 if (entry != null) {
413 freeEntry(entry);
414 } else {
415
416 return numFreed;
417 }
418 }
419 return numFreed;
420 }
421
422
423
424
425 public final void trim() {
426 int free = size - allocations;
427 allocations = 0;
428
429
430 if (free > 0) {
431 free(free);
432 }
433 }
434
435 @SuppressWarnings({ "unchecked", "rawtypes" })
436 private void freeEntry(Entry entry) {
437 PoolChunk chunk = entry.chunk;
438 long handle = entry.handle;
439
440
441 entry.recycle();
442
443 chunk.arena.freeChunk(chunk, handle, sizeClass);
444 }
445
446 static final class Entry<T> {
447 final Handle recyclerHandle;
448 PoolChunk<T> chunk;
449 long handle = -1;
450
451 Entry(Handle recyclerHandle) {
452 this.recyclerHandle = recyclerHandle;
453 }
454
455 void recycle() {
456 chunk = null;
457 handle = -1;
458 RECYCLER.recycle(this, recyclerHandle);
459 }
460 }
461
462 @SuppressWarnings("rawtypes")
463 private static Entry newEntry(PoolChunk<?> chunk, long handle) {
464 Entry entry = RECYCLER.get();
465 entry.chunk = chunk;
466 entry.handle = handle;
467 return entry;
468 }
469
470 @SuppressWarnings("rawtypes")
471 private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
472 @Override
473 protected Entry newObject(Handle handle) {
474 return new Entry(handle);
475 }
476 };
477 }
478 }