1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.buffer;
17
18 import io.netty.util.internal.LongCounter;
19 import io.netty.util.internal.PlatformDependent;
20
21 import java.nio.ByteBuffer;
22 import java.util.ArrayDeque;
23 import java.util.Deque;
24 import java.util.PriorityQueue;
25 import java.util.concurrent.locks.ReentrantLock;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136 final class PoolChunk<T> implements PoolChunkMetric {
137 private static final int SIZE_BIT_LENGTH = 15;
138 private static final int INUSED_BIT_LENGTH = 1;
139 private static final int SUBPAGE_BIT_LENGTH = 1;
140 private static final int BITMAP_IDX_BIT_LENGTH = 32;
141
142 static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
143 static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
144 static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
145 static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
146
147 final PoolArena<T> arena;
148 final Object base;
149 final T memory;
150 final boolean unpooled;
151
152
153
154
155 private final LongLongHashMap runsAvailMap;
156
157
158
159
160 private final IntPriorityQueue[] runsAvail;
161
162 private final ReentrantLock runsAvailLock;
163
164
165
166
167 private final PoolSubpage<T>[] subpages;
168
169
170
171
172 private final LongCounter pinnedBytes = PlatformDependent.newLongCounter();
173
174 private final int pageSize;
175 private final int pageShifts;
176 private final int chunkSize;
177
178
179
180
181
182
183 private final Deque<ByteBuffer> cachedNioBuffers;
184
185 int freeBytes;
186
187 PoolChunkList<T> parent;
188 PoolChunk<T> prev;
189 PoolChunk<T> next;
190
191
192
193
194 @SuppressWarnings("unchecked")
195 PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
196 unpooled = false;
197 this.arena = arena;
198 this.base = base;
199 this.memory = memory;
200 this.pageSize = pageSize;
201 this.pageShifts = pageShifts;
202 this.chunkSize = chunkSize;
203 freeBytes = chunkSize;
204
205 runsAvail = newRunsAvailqueueArray(maxPageIdx);
206 runsAvailLock = new ReentrantLock();
207 runsAvailMap = new LongLongHashMap(-1);
208 subpages = new PoolSubpage[chunkSize >> pageShifts];
209
210
211 int pages = chunkSize >> pageShifts;
212 long initHandle = (long) pages << SIZE_SHIFT;
213 insertAvailRun(0, pages, initHandle);
214
215 cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
216 }
217
218
219 PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
220 unpooled = true;
221 this.arena = arena;
222 this.base = base;
223 this.memory = memory;
224 pageSize = 0;
225 pageShifts = 0;
226 runsAvailMap = null;
227 runsAvail = null;
228 runsAvailLock = null;
229 subpages = null;
230 chunkSize = size;
231 cachedNioBuffers = null;
232 }
233
234 private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
235 IntPriorityQueue[] queueArray = new IntPriorityQueue[size];
236 for (int i = 0; i < queueArray.length; i++) {
237 queueArray[i] = new IntPriorityQueue();
238 }
239 return queueArray;
240 }
241
242 private void insertAvailRun(int runOffset, int pages, long handle) {
243 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
244 IntPriorityQueue queue = runsAvail[pageIdxFloor];
245 assert isRun(handle);
246 queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
247
248
249 insertAvailRun0(runOffset, handle);
250 if (pages > 1) {
251
252 insertAvailRun0(lastPage(runOffset, pages), handle);
253 }
254 }
255
256 private void insertAvailRun0(int runOffset, long handle) {
257 long pre = runsAvailMap.put(runOffset, handle);
258 assert pre == -1;
259 }
260
261 private void removeAvailRun(long handle) {
262 int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
263 runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
264 removeAvailRun0(handle);
265 }
266
267 private void removeAvailRun0(long handle) {
268 int runOffset = runOffset(handle);
269 int pages = runPages(handle);
270
271 runsAvailMap.remove(runOffset);
272 if (pages > 1) {
273
274 runsAvailMap.remove(lastPage(runOffset, pages));
275 }
276 }
277
278 private static int lastPage(int runOffset, int pages) {
279 return runOffset + pages - 1;
280 }
281
282 private long getAvailRunByOffset(int runOffset) {
283 return runsAvailMap.get(runOffset);
284 }
285
286 @Override
287 public int usage() {
288 final int freeBytes;
289 if (this.unpooled) {
290 freeBytes = this.freeBytes;
291 } else {
292 runsAvailLock.lock();
293 try {
294 freeBytes = this.freeBytes;
295 } finally {
296 runsAvailLock.unlock();
297 }
298 }
299 return usage(freeBytes);
300 }
301
302 private int usage(int freeBytes) {
303 if (freeBytes == 0) {
304 return 100;
305 }
306
307 int freePercentage = (int) (freeBytes * 100L / chunkSize);
308 if (freePercentage == 0) {
309 return 99;
310 }
311 return 100 - freePercentage;
312 }
313
314 boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
315 final long handle;
316 if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
317 final PoolSubpage<T> nextSub;
318
319
320
321 PoolSubpage<T> head = arena.smallSubpagePools[sizeIdx];
322 head.lock();
323 try {
324 nextSub = head.next;
325 if (nextSub != head) {
326 assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
327 "doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
328 sizeIdx;
329 handle = nextSub.allocate();
330 assert handle >= 0;
331 assert isSubpage(handle);
332 nextSub.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
333 return true;
334 }
335 handle = allocateSubpage(sizeIdx, head);
336 if (handle < 0) {
337 return false;
338 }
339 assert isSubpage(handle);
340 } finally {
341 head.unlock();
342 }
343 } else {
344
345
346 int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
347 handle = allocateRun(runSize);
348 if (handle < 0) {
349 return false;
350 }
351 assert !isSubpage(handle);
352 }
353
354 ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
355 initBuf(buf, nioBuffer, handle, reqCapacity, cache);
356 return true;
357 }
358
359 private long allocateRun(int runSize) {
360 int pages = runSize >> pageShifts;
361 int pageIdx = arena.sizeClass.pages2pageIdx(pages);
362
363 runsAvailLock.lock();
364 try {
365
366 int queueIdx = runFirstBestFit(pageIdx);
367 if (queueIdx == -1) {
368 return -1;
369 }
370
371
372 IntPriorityQueue queue = runsAvail[queueIdx];
373 long handle = queue.poll();
374 assert handle != IntPriorityQueue.NO_VALUE;
375 handle <<= BITMAP_IDX_BIT_LENGTH;
376 assert !isUsed(handle) : "invalid handle: " + handle;
377
378 removeAvailRun0(handle);
379
380 handle = splitLargeRun(handle, pages);
381
382 int pinnedSize = runSize(pageShifts, handle);
383 freeBytes -= pinnedSize;
384 return handle;
385 } finally {
386 runsAvailLock.unlock();
387 }
388 }
389
390 private int calculateRunSize(int sizeIdx) {
391 int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
392 int runSize = 0;
393 int nElements;
394
395 final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
396
397
398 do {
399 runSize += pageSize;
400 nElements = runSize / elemSize;
401 } while (nElements < maxElements && runSize != nElements * elemSize);
402
403 while (nElements > maxElements) {
404 runSize -= pageSize;
405 nElements = runSize / elemSize;
406 }
407
408 assert nElements > 0;
409 assert runSize <= chunkSize;
410 assert runSize >= elemSize;
411
412 return runSize;
413 }
414
415 private int runFirstBestFit(int pageIdx) {
416 if (freeBytes == chunkSize) {
417 return arena.sizeClass.nPSizes - 1;
418 }
419 for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
420 IntPriorityQueue queue = runsAvail[i];
421 if (queue != null && !queue.isEmpty()) {
422 return i;
423 }
424 }
425 return -1;
426 }
427
428 private long splitLargeRun(long handle, int needPages) {
429 assert needPages > 0;
430
431 int totalPages = runPages(handle);
432 assert needPages <= totalPages;
433
434 int remPages = totalPages - needPages;
435
436 if (remPages > 0) {
437 int runOffset = runOffset(handle);
438
439
440 int availOffset = runOffset + needPages;
441 long availRun = toRunHandle(availOffset, remPages, 0);
442 insertAvailRun(availOffset, remPages, availRun);
443
444
445 return toRunHandle(runOffset, needPages, 1);
446 }
447
448
449 handle |= 1L << IS_USED_SHIFT;
450 return handle;
451 }
452
453
454
455
456
457
458
459
460
461
462 private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {
463
464 int runSize = calculateRunSize(sizeIdx);
465
466 long runHandle = allocateRun(runSize);
467 if (runHandle < 0) {
468 return -1;
469 }
470
471 int runOffset = runOffset(runHandle);
472 assert subpages[runOffset] == null;
473 int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
474
475 PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
476 runSize(pageShifts, runHandle), elemSize);
477
478 subpages[runOffset] = subpage;
479 return subpage.allocate();
480 }
481
482
483
484
485
486
487
488
489 void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
490 if (isSubpage(handle)) {
491 int sIdx = runOffset(handle);
492 PoolSubpage<T> subpage = subpages[sIdx];
493 assert subpage != null;
494 PoolSubpage<T> head = subpage.chunk.arena.smallSubpagePools[subpage.headIndex];
495
496
497 head.lock();
498 try {
499 assert subpage.doNotDestroy;
500 if (subpage.free(head, bitmapIdx(handle))) {
501
502 return;
503 }
504 assert !subpage.doNotDestroy;
505
506 subpages[sIdx] = null;
507 } finally {
508 head.unlock();
509 }
510 }
511
512 int runSize = runSize(pageShifts, handle);
513
514 runsAvailLock.lock();
515 try {
516
517
518 long finalRun = collapseRuns(handle);
519
520
521 finalRun &= ~(1L << IS_USED_SHIFT);
522
523 finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
524
525 insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
526 freeBytes += runSize;
527 } finally {
528 runsAvailLock.unlock();
529 }
530
531 if (nioBuffer != null && cachedNioBuffers != null &&
532 cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
533 cachedNioBuffers.offer(nioBuffer);
534 }
535 }
536
537 private long collapseRuns(long handle) {
538 return collapseNext(collapsePast(handle));
539 }
540
541 private long collapsePast(long handle) {
542 for (;;) {
543 int runOffset = runOffset(handle);
544 int runPages = runPages(handle);
545
546 long pastRun = getAvailRunByOffset(runOffset - 1);
547 if (pastRun == -1) {
548 return handle;
549 }
550
551 int pastOffset = runOffset(pastRun);
552 int pastPages = runPages(pastRun);
553
554
555 if (pastRun != handle && pastOffset + pastPages == runOffset) {
556
557 removeAvailRun(pastRun);
558 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
559 } else {
560 return handle;
561 }
562 }
563 }
564
565 private long collapseNext(long handle) {
566 for (;;) {
567 int runOffset = runOffset(handle);
568 int runPages = runPages(handle);
569
570 long nextRun = getAvailRunByOffset(runOffset + runPages);
571 if (nextRun == -1) {
572 return handle;
573 }
574
575 int nextOffset = runOffset(nextRun);
576 int nextPages = runPages(nextRun);
577
578
579 if (nextRun != handle && runOffset + runPages == nextOffset) {
580
581 removeAvailRun(nextRun);
582 handle = toRunHandle(runOffset, runPages + nextPages, 0);
583 } else {
584 return handle;
585 }
586 }
587 }
588
589 private static long toRunHandle(int runOffset, int runPages, int inUsed) {
590 return (long) runOffset << RUN_OFFSET_SHIFT
591 | (long) runPages << SIZE_SHIFT
592 | (long) inUsed << IS_USED_SHIFT;
593 }
594
595 void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
596 PoolThreadCache threadCache) {
597 if (isSubpage(handle)) {
598 initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
599 } else {
600 int maxLength = runSize(pageShifts, handle);
601 buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
602 reqCapacity, maxLength, arena.parent.threadCache());
603 }
604 }
605
606 void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
607 PoolThreadCache threadCache) {
608 int runOffset = runOffset(handle);
609 int bitmapIdx = bitmapIdx(handle);
610
611 PoolSubpage<T> s = subpages[runOffset];
612 assert s.isDoNotDestroy();
613 assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
614
615 int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
616 buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
617 }
618
619 void incrementPinnedMemory(int delta) {
620 assert delta > 0;
621 pinnedBytes.add(delta);
622 }
623
624 void decrementPinnedMemory(int delta) {
625 assert delta > 0;
626 pinnedBytes.add(-delta);
627 }
628
629 @Override
630 public int chunkSize() {
631 return chunkSize;
632 }
633
634 @Override
635 public int freeBytes() {
636 if (this.unpooled) {
637 return freeBytes;
638 }
639 runsAvailLock.lock();
640 try {
641 return freeBytes;
642 } finally {
643 runsAvailLock.unlock();
644 }
645 }
646
647 public int pinnedBytes() {
648 return (int) pinnedBytes.value();
649 }
650
651 @Override
652 public String toString() {
653 final int freeBytes;
654 if (this.unpooled) {
655 freeBytes = this.freeBytes;
656 } else {
657 runsAvailLock.lock();
658 try {
659 freeBytes = this.freeBytes;
660 } finally {
661 runsAvailLock.unlock();
662 }
663 }
664
665 return new StringBuilder()
666 .append("Chunk(")
667 .append(Integer.toHexString(System.identityHashCode(this)))
668 .append(": ")
669 .append(usage(freeBytes))
670 .append("%, ")
671 .append(chunkSize - freeBytes)
672 .append('/')
673 .append(chunkSize)
674 .append(')')
675 .toString();
676 }
677
678 void destroy() {
679 arena.destroyChunk(this);
680 }
681
682 static int runOffset(long handle) {
683 return (int) (handle >> RUN_OFFSET_SHIFT);
684 }
685
686 static int runSize(int pageShifts, long handle) {
687 return runPages(handle) << pageShifts;
688 }
689
690 static int runPages(long handle) {
691 return (int) (handle >> SIZE_SHIFT & 0x7fff);
692 }
693
694 static boolean isUsed(long handle) {
695 return (handle >> IS_USED_SHIFT & 1) == 1L;
696 }
697
698 static boolean isRun(long handle) {
699 return !isSubpage(handle);
700 }
701
702 static boolean isSubpage(long handle) {
703 return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
704 }
705
706 static int bitmapIdx(long handle) {
707 return (int) handle;
708 }
709 }