查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.buffer;
17  
18  import io.netty.util.internal.LongCounter;
19  import io.netty.util.internal.PlatformDependent;
20  
21  import java.nio.ByteBuffer;
22  import java.util.ArrayDeque;
23  import java.util.Deque;
24  import java.util.PriorityQueue;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  /**
28   * Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
29   *
30   * Notation: The following terms are important to understand the code
31   * > page  - a page is the smallest unit of memory chunk that can be allocated
32   * > run   - a run is a collection of pages
33   * > chunk - a chunk is a collection of runs
34   * > in this code chunkSize = maxPages * pageSize
35   *
36   * To begin we allocate a byte array of size = chunkSize
37   * Whenever a ByteBuf of given size needs to be created we search for the first position
38   * in the byte array that has enough empty space to accommodate the requested size and
39   * return a (long) handle that encodes this offset information, (this memory segment is then
40   * marked as reserved so it is always used by exactly one ByteBuf and no more)
41   *
42   * For simplicity all sizes are normalized according to {@link PoolArena#sizeClass#size2SizeIdx(int)} method.
43   * This ensures that when we request for memory segments of size > pageSize the normalizedCapacity
44   * equals the next nearest size in {@link SizeClasses}.
45   *
46   *
47   *  A chunk has the following layout:
48   *
49   *     /-----------------\
50   *     | run             |
51   *     |                 |
52   *     |                 |
53   *     |-----------------|
54   *     | run             |
55   *     |                 |
56   *     |-----------------|
57   *     | unalloctated    |
58   *     | (freed)         |
59   *     |                 |
60   *     |-----------------|
61   *     | subpage         |
62   *     |-----------------|
63   *     | unallocated     |
64   *     | (freed)         |
65   *     | ...             |
66   *     | ...             |
67   *     | ...             |
68   *     |                 |
69   *     |                 |
70   *     |                 |
71   *     \-----------------/
72   *
73   *
74   * handle:
75   * -------
76   * a handle is a long number, the bit layout of a run looks like:
77   *
78   * oooooooo ooooooos ssssssss ssssssue bbbbbbbb bbbbbbbb bbbbbbbb bbbbbbbb
79   *
80   * o: runOffset (page offset in the chunk), 15bit
81   * s: size (number of pages) of this run, 15bit
82   * u: isUsed?, 1bit
83   * e: isSubpage?, 1bit
84   * b: bitmapIdx of subpage, zero if it's not subpage, 32bit
85   *
86   * runsAvailMap:
87   * ------
88   * a map which manages all runs (used and not in used).
89   * For each run, the first runOffset and last runOffset are stored in runsAvailMap.
90   * key: runOffset
91   * value: handle
92   *
93   * runsAvail:
94   * ----------
95   * an array of {@link PriorityQueue}.
96   * Each queue manages same size of runs.
97   * Runs are sorted by offset, so that we always allocate runs with smaller offset.
98   *
99   *
100  * Algorithm:
101  * ----------
102  *
103  *   As we allocate runs, we update values stored in runsAvailMap and runsAvail so that the property is maintained.
104  *
105  * Initialization -
106  *  In the beginning we store the initial run which is the whole chunk.
107  *  The initial run:
108  *  runOffset = 0
109  *  size = chunkSize
110  *  isUsed = no
111  *  isSubpage = no
112  *  bitmapIdx = 0
113  *
114  *
115  * Algorithm: [allocateRun(size)]
116  * ----------
117  * 1) find the first avail run using in runsAvails according to size
118  * 2) if pages of run is larger than request pages then split it, and save the tailing run
119  *    for later using
120  *
121  * Algorithm: [allocateSubpage(size)]
122  * ----------
123  * 1) find a not full subpage according to size.
124  *    if it already exists just return, otherwise allocate a new PoolSubpage and call init()
125  *    note that this subpage object is added to subpagesPool in the PoolArena when we init() it
126  * 2) call subpage.allocate()
127  *
128  * Algorithm: [free(handle, length, nioBuffer)]
129  * ----------
130  * 1) if it is a subpage, return the slab back into this subpage
131  * 2) if the subpage is not used or it is a run, then start free this run
132  * 3) merge continuous avail runs
133  * 4) save the merged run
134  *
135  */
136 final class PoolChunk<T> implements PoolChunkMetric {
137     private static final int SIZE_BIT_LENGTH = 15;
138     private static final int INUSED_BIT_LENGTH = 1;
139     private static final int SUBPAGE_BIT_LENGTH = 1;
140     private static final int BITMAP_IDX_BIT_LENGTH = 32;
141 
142     static final int IS_SUBPAGE_SHIFT = BITMAP_IDX_BIT_LENGTH;
143     static final int IS_USED_SHIFT = SUBPAGE_BIT_LENGTH + IS_SUBPAGE_SHIFT;
144     static final int SIZE_SHIFT = INUSED_BIT_LENGTH + IS_USED_SHIFT;
145     static final int RUN_OFFSET_SHIFT = SIZE_BIT_LENGTH + SIZE_SHIFT;
146 
147     final PoolArena<T> arena;
148     final Object base;
149     final T memory;
150     final boolean unpooled;
151 
152     /**
153      * store the first page and last page of each avail run
154      */
155     private final LongLongHashMap runsAvailMap;
156 
157     /**
158      * manage all avail runs
159      */
160     private final IntPriorityQueue[] runsAvail;
161 
162     private final ReentrantLock runsAvailLock;
163 
164     /**
165      * manage all subpages in this chunk
166      */
167     private final PoolSubpage<T>[] subpages;
168 
169     /**
170      * Accounting of pinned memory – memory that is currently in use by ByteBuf instances.
171      */
172     private final LongCounter pinnedBytes = PlatformDependent.newLongCounter();
173 
174     private final int pageSize;
175     private final int pageShifts;
176     private final int chunkSize;
177 
178     // Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
179     // around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
180     // may produce extra GC, which can be greatly reduced by caching the duplicates.
181     //
182     // This may be null if the PoolChunk is unpooled as pooling the ByteBuffer instances does not make any sense here.
183     private final Deque<ByteBuffer> cachedNioBuffers;
184 
185     int freeBytes;
186 
187     PoolChunkList<T> parent;
188     PoolChunk<T> prev;
189     PoolChunk<T> next;
190 
191     // TODO: Test if adding padding helps under contention
192     //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
193 
194     @SuppressWarnings("unchecked")
195     PoolChunk(PoolArena<T> arena, Object base, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx) {
196         unpooled = false;
197         this.arena = arena;
198         this.base = base;
199         this.memory = memory;
200         this.pageSize = pageSize;
201         this.pageShifts = pageShifts;
202         this.chunkSize = chunkSize;
203         freeBytes = chunkSize;
204 
205         runsAvail = newRunsAvailqueueArray(maxPageIdx);
206         runsAvailLock = new ReentrantLock();
207         runsAvailMap = new LongLongHashMap(-1);
208         subpages = new PoolSubpage[chunkSize >> pageShifts];
209 
210         //insert initial run, offset = 0, pages = chunkSize / pageSize
211         int pages = chunkSize >> pageShifts;
212         long initHandle = (long) pages << SIZE_SHIFT;
213         insertAvailRun(0, pages, initHandle);
214 
215         cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
216     }
217 
218     /** Creates a special chunk that is not pooled. */
219     PoolChunk(PoolArena<T> arena, Object base, T memory, int size) {
220         unpooled = true;
221         this.arena = arena;
222         this.base = base;
223         this.memory = memory;
224         pageSize = 0;
225         pageShifts = 0;
226         runsAvailMap = null;
227         runsAvail = null;
228         runsAvailLock = null;
229         subpages = null;
230         chunkSize = size;
231         cachedNioBuffers = null;
232     }
233 
234     private static IntPriorityQueue[] newRunsAvailqueueArray(int size) {
235         IntPriorityQueue[] queueArray = new IntPriorityQueue[size];
236         for (int i = 0; i < queueArray.length; i++) {
237             queueArray[i] = new IntPriorityQueue();
238         }
239         return queueArray;
240     }
241 
242     private void insertAvailRun(int runOffset, int pages, long handle) {
243         int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(pages);
244         IntPriorityQueue queue = runsAvail[pageIdxFloor];
245         assert isRun(handle);
246         queue.offer((int) (handle >> BITMAP_IDX_BIT_LENGTH));
247 
248         //insert first page of run
249         insertAvailRun0(runOffset, handle);
250         if (pages > 1) {
251             //insert last page of run
252             insertAvailRun0(lastPage(runOffset, pages), handle);
253         }
254     }
255 
256     private void insertAvailRun0(int runOffset, long handle) {
257         long pre = runsAvailMap.put(runOffset, handle);
258         assert pre == -1;
259     }
260 
261     private void removeAvailRun(long handle) {
262         int pageIdxFloor = arena.sizeClass.pages2pageIdxFloor(runPages(handle));
263         runsAvail[pageIdxFloor].remove((int) (handle >> BITMAP_IDX_BIT_LENGTH));
264         removeAvailRun0(handle);
265     }
266 
267     private void removeAvailRun0(long handle) {
268         int runOffset = runOffset(handle);
269         int pages = runPages(handle);
270         //remove first page of run
271         runsAvailMap.remove(runOffset);
272         if (pages > 1) {
273             //remove last page of run
274             runsAvailMap.remove(lastPage(runOffset, pages));
275         }
276     }
277 
278     private static int lastPage(int runOffset, int pages) {
279         return runOffset + pages - 1;
280     }
281 
282     private long getAvailRunByOffset(int runOffset) {
283         return runsAvailMap.get(runOffset);
284     }
285 
286     @Override
287     public int usage() {
288         final int freeBytes;
289         if (this.unpooled) {
290             freeBytes = this.freeBytes;
291         } else {
292             runsAvailLock.lock();
293             try {
294                 freeBytes = this.freeBytes;
295             } finally {
296                 runsAvailLock.unlock();
297             }
298         }
299         return usage(freeBytes);
300     }
301 
302     private int usage(int freeBytes) {
303         if (freeBytes == 0) {
304             return 100;
305         }
306 
307         int freePercentage = (int) (freeBytes * 100L / chunkSize);
308         if (freePercentage == 0) {
309             return 99;
310         }
311         return 100 - freePercentage;
312     }
313 
314     boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
315         final long handle;
316         if (sizeIdx <= arena.sizeClass.smallMaxSizeIdx) {
317             final PoolSubpage<T> nextSub;
318             // small
319             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
320             // This is need as we may add it back and so alter the linked-list structure.
321             PoolSubpage<T> head = arena.smallSubpagePools[sizeIdx];
322             head.lock();
323             try {
324                 nextSub = head.next;
325                 if (nextSub != head) {
326                     assert nextSub.doNotDestroy && nextSub.elemSize == arena.sizeClass.sizeIdx2size(sizeIdx) :
327                             "doNotDestroy=" + nextSub.doNotDestroy + ", elemSize=" + nextSub.elemSize + ", sizeIdx=" +
328                                     sizeIdx;
329                     handle = nextSub.allocate();
330                     assert handle >= 0;
331                     assert isSubpage(handle);
332                     nextSub.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache);
333                     return true;
334                 }
335                 handle = allocateSubpage(sizeIdx, head);
336                 if (handle < 0) {
337                     return false;
338                 }
339                 assert isSubpage(handle);
340             } finally {
341                 head.unlock();
342             }
343         } else {
344             // normal
345             // runSize must be multiple of pageSize
346             int runSize = arena.sizeClass.sizeIdx2size(sizeIdx);
347             handle = allocateRun(runSize);
348             if (handle < 0) {
349                 return false;
350             }
351             assert !isSubpage(handle);
352         }
353 
354         ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
355         initBuf(buf, nioBuffer, handle, reqCapacity, cache);
356         return true;
357     }
358 
359     private long allocateRun(int runSize) {
360         int pages = runSize >> pageShifts;
361         int pageIdx = arena.sizeClass.pages2pageIdx(pages);
362 
363         runsAvailLock.lock();
364         try {
365             //find first queue which has at least one big enough run
366             int queueIdx = runFirstBestFit(pageIdx);
367             if (queueIdx == -1) {
368                 return -1;
369             }
370 
371             //get run with min offset in this queue
372             IntPriorityQueue queue = runsAvail[queueIdx];
373             long handle = queue.poll();
374             assert handle != IntPriorityQueue.NO_VALUE;
375             handle <<= BITMAP_IDX_BIT_LENGTH;
376             assert !isUsed(handle) : "invalid handle: " + handle;
377 
378             removeAvailRun0(handle);
379 
380             handle = splitLargeRun(handle, pages);
381 
382             int pinnedSize = runSize(pageShifts, handle);
383             freeBytes -= pinnedSize;
384             return handle;
385         } finally {
386             runsAvailLock.unlock();
387         }
388     }
389 
390     private int calculateRunSize(int sizeIdx) {
391         int maxElements = 1 << pageShifts - SizeClasses.LOG2_QUANTUM;
392         int runSize = 0;
393         int nElements;
394 
395         final int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
396 
397         //find lowest common multiple of pageSize and elemSize
398         do {
399             runSize += pageSize;
400             nElements = runSize / elemSize;
401         } while (nElements < maxElements && runSize != nElements * elemSize);
402 
403         while (nElements > maxElements) {
404             runSize -= pageSize;
405             nElements = runSize / elemSize;
406         }
407 
408         assert nElements > 0;
409         assert runSize <= chunkSize;
410         assert runSize >= elemSize;
411 
412         return runSize;
413     }
414 
415     private int runFirstBestFit(int pageIdx) {
416         if (freeBytes == chunkSize) {
417             return arena.sizeClass.nPSizes - 1;
418         }
419         for (int i = pageIdx; i < arena.sizeClass.nPSizes; i++) {
420             IntPriorityQueue queue = runsAvail[i];
421             if (queue != null && !queue.isEmpty()) {
422                 return i;
423             }
424         }
425         return -1;
426     }
427 
428     private long splitLargeRun(long handle, int needPages) {
429         assert needPages > 0;
430 
431         int totalPages = runPages(handle);
432         assert needPages <= totalPages;
433 
434         int remPages = totalPages - needPages;
435 
436         if (remPages > 0) {
437             int runOffset = runOffset(handle);
438 
439             // keep track of trailing unused pages for later use
440             int availOffset = runOffset + needPages;
441             long availRun = toRunHandle(availOffset, remPages, 0);
442             insertAvailRun(availOffset, remPages, availRun);
443 
444             // not avail
445             return toRunHandle(runOffset, needPages, 1);
446         }
447 
448         //mark it as used
449         handle |= 1L << IS_USED_SHIFT;
450         return handle;
451     }
452 
453     /**
454      * Create / initialize a new PoolSubpage of normCapacity. Any PoolSubpage created / initialized here is added to
455      * subpage pool in the PoolArena that owns this PoolChunk.
456      *
457      * @param sizeIdx sizeIdx of normalized size
458      * @param head head of subpages
459      *
460      * @return index in memoryMap
461      */
462     private long allocateSubpage(int sizeIdx, PoolSubpage<T> head) {
463         //allocate a new run
464         int runSize = calculateRunSize(sizeIdx);
465         //runSize must be multiples of pageSize
466         long runHandle = allocateRun(runSize);
467         if (runHandle < 0) {
468             return -1;
469         }
470 
471         int runOffset = runOffset(runHandle);
472         assert subpages[runOffset] == null;
473         int elemSize = arena.sizeClass.sizeIdx2size(sizeIdx);
474 
475         PoolSubpage<T> subpage = new PoolSubpage<T>(head, this, pageShifts, runOffset,
476                 runSize(pageShifts, runHandle), elemSize);
477 
478         subpages[runOffset] = subpage;
479         return subpage.allocate();
480     }
481 
482     /**
483      * Free a subpage or a run of pages When a subpage is freed from PoolSubpage, it might be added back to subpage pool
484      * of the owning PoolArena. If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize,
485      * we can completely free the owning Page so it is available for subsequent allocations
486      *
487      * @param handle handle to free
488      */
489     void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
490         if (isSubpage(handle)) {
491             int sIdx = runOffset(handle);
492             PoolSubpage<T> subpage = subpages[sIdx];
493             assert subpage != null;
494             PoolSubpage<T> head = subpage.chunk.arena.smallSubpagePools[subpage.headIndex];
495             // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
496             // This is need as we may add it back and so alter the linked-list structure.
497             head.lock();
498             try {
499                 assert subpage.doNotDestroy;
500                 if (subpage.free(head, bitmapIdx(handle))) {
501                     //the subpage is still used, do not free it
502                     return;
503                 }
504                 assert !subpage.doNotDestroy;
505                 // Null out slot in the array as it was freed and we should not use it anymore.
506                 subpages[sIdx] = null;
507             } finally {
508                 head.unlock();
509             }
510         }
511 
512         int runSize = runSize(pageShifts, handle);
513         //start free run
514         runsAvailLock.lock();
515         try {
516             // collapse continuous runs, successfully collapsed runs
517             // will be removed from runsAvail and runsAvailMap
518             long finalRun = collapseRuns(handle);
519 
520             //set run as not used
521             finalRun &= ~(1L << IS_USED_SHIFT);
522             //if it is a subpage, set it to run
523             finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
524 
525             insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
526             freeBytes += runSize;
527         } finally {
528             runsAvailLock.unlock();
529         }
530 
531         if (nioBuffer != null && cachedNioBuffers != null &&
532             cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
533             cachedNioBuffers.offer(nioBuffer);
534         }
535     }
536 
537     private long collapseRuns(long handle) {
538         return collapseNext(collapsePast(handle));
539     }
540 
541     private long collapsePast(long handle) {
542         for (;;) {
543             int runOffset = runOffset(handle);
544             int runPages = runPages(handle);
545 
546             long pastRun = getAvailRunByOffset(runOffset - 1);
547             if (pastRun == -1) {
548                 return handle;
549             }
550 
551             int pastOffset = runOffset(pastRun);
552             int pastPages = runPages(pastRun);
553 
554             //is continuous
555             if (pastRun != handle && pastOffset + pastPages == runOffset) {
556                 //remove past run
557                 removeAvailRun(pastRun);
558                 handle = toRunHandle(pastOffset, pastPages + runPages, 0);
559             } else {
560                 return handle;
561             }
562         }
563     }
564 
565     private long collapseNext(long handle) {
566         for (;;) {
567             int runOffset = runOffset(handle);
568             int runPages = runPages(handle);
569 
570             long nextRun = getAvailRunByOffset(runOffset + runPages);
571             if (nextRun == -1) {
572                 return handle;
573             }
574 
575             int nextOffset = runOffset(nextRun);
576             int nextPages = runPages(nextRun);
577 
578             //is continuous
579             if (nextRun != handle && runOffset + runPages == nextOffset) {
580                 //remove next run
581                 removeAvailRun(nextRun);
582                 handle = toRunHandle(runOffset, runPages + nextPages, 0);
583             } else {
584                 return handle;
585             }
586         }
587     }
588 
589     private static long toRunHandle(int runOffset, int runPages, int inUsed) {
590         return (long) runOffset << RUN_OFFSET_SHIFT
591                | (long) runPages << SIZE_SHIFT
592                | (long) inUsed << IS_USED_SHIFT;
593     }
594 
595     void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
596                  PoolThreadCache threadCache) {
597         if (isSubpage(handle)) {
598             initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
599         } else {
600             int maxLength = runSize(pageShifts, handle);
601             buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
602                     reqCapacity, maxLength, arena.parent.threadCache());
603         }
604     }
605 
606     void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
607                             PoolThreadCache threadCache) {
608         int runOffset = runOffset(handle);
609         int bitmapIdx = bitmapIdx(handle);
610 
611         PoolSubpage<T> s = subpages[runOffset];
612         assert s.isDoNotDestroy();
613         assert reqCapacity <= s.elemSize : reqCapacity + "<=" + s.elemSize;
614 
615         int offset = (runOffset << pageShifts) + bitmapIdx * s.elemSize;
616         buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
617     }
618 
619     void incrementPinnedMemory(int delta) {
620         assert delta > 0;
621         pinnedBytes.add(delta);
622     }
623 
624     void decrementPinnedMemory(int delta) {
625         assert delta > 0;
626         pinnedBytes.add(-delta);
627     }
628 
629     @Override
630     public int chunkSize() {
631         return chunkSize;
632     }
633 
634     @Override
635     public int freeBytes() {
636         if (this.unpooled) {
637             return freeBytes;
638         }
639         runsAvailLock.lock();
640         try {
641             return freeBytes;
642         } finally {
643             runsAvailLock.unlock();
644         }
645     }
646 
647     public int pinnedBytes() {
648         return (int) pinnedBytes.value();
649     }
650 
651     @Override
652     public String toString() {
653         final int freeBytes;
654         if (this.unpooled) {
655             freeBytes = this.freeBytes;
656         } else {
657             runsAvailLock.lock();
658             try {
659                 freeBytes = this.freeBytes;
660             } finally {
661                 runsAvailLock.unlock();
662             }
663         }
664 
665         return new StringBuilder()
666                 .append("Chunk(")
667                 .append(Integer.toHexString(System.identityHashCode(this)))
668                 .append(": ")
669                 .append(usage(freeBytes))
670                 .append("%, ")
671                 .append(chunkSize - freeBytes)
672                 .append('/')
673                 .append(chunkSize)
674                 .append(')')
675                 .toString();
676     }
677 
678     void destroy() {
679         arena.destroyChunk(this);
680     }
681 
682     static int runOffset(long handle) {
683         return (int) (handle >> RUN_OFFSET_SHIFT);
684     }
685 
686     static int runSize(int pageShifts, long handle) {
687         return runPages(handle) << pageShifts;
688     }
689 
690     static int runPages(long handle) {
691         return (int) (handle >> SIZE_SHIFT & 0x7fff);
692     }
693 
694     static boolean isUsed(long handle) {
695         return (handle >> IS_USED_SHIFT & 1) == 1L;
696     }
697 
698     static boolean isRun(long handle) {
699         return !isSubpage(handle);
700     }
701 
702     static boolean isSubpage(long handle) {
703         return (handle >> IS_SUBPAGE_SHIFT & 1) == 1L;
704     }
705 
706     static int bitmapIdx(long handle) {
707         return (int) handle;
708     }
709 }