查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Licensed under the Apache License, Version 2.0 (the "License");
3    * you may not use this file except in compliance with the License.
4    * You may obtain a copy of the License at
5    *
6    * http://www.apache.org/licenses/LICENSE-2.0
7    *
8    * Unless required by applicable law or agreed to in writing, software
9    * distributed under the License is distributed on an "AS IS" BASIS,
10   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11   * See the License for the specific language governing permissions and
12   * limitations under the License.
13   */
14  package io.netty.util.internal.shaded.org.jctools.queues.atomic;
15  
16  import io.netty.util.internal.shaded.org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
17  import io.netty.util.internal.shaded.org.jctools.util.PortableJvmInfo;
18  import io.netty.util.internal.shaded.org.jctools.util.Pow2;
19  import io.netty.util.internal.shaded.org.jctools.util.RangeUtil;
20  import java.util.AbstractQueue;
21  import java.util.Iterator;
22  import java.util.NoSuchElementException;
23  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
24  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
25  import java.util.concurrent.atomic.AtomicReferenceArray;
26  import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue;
27  import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Supplier;
28  import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueueUtil;
29  import io.netty.util.internal.shaded.org.jctools.queues.QueueProgressIndicators;
30  import io.netty.util.internal.shaded.org.jctools.queues.IndexedQueueSizeUtil;
31  import static io.netty.util.internal.shaded.org.jctools.queues.atomic.AtomicQueueUtil.*;
32  
33  /**
34   * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
35   * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
36   */
37  abstract class BaseMpscLinkedAtomicArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue {
38  
39      long p01, p02, p03, p04, p05, p06, p07;
40  
41      long p10, p11, p12, p13, p14, p15, p16, p17;
42  }
43  
44  /**
45   * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
46   * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
47   */
48  abstract class BaseMpscLinkedAtomicArrayQueueProducerFields<E> extends BaseMpscLinkedAtomicArrayQueuePad1<E> {
49  
50      private static final AtomicLongFieldUpdater<BaseMpscLinkedAtomicArrayQueueProducerFields> P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueProducerFields.class, "producerIndex");
51  
52      private volatile long producerIndex;
53  
54      @Override
55      public final long lvProducerIndex() {
56          return producerIndex;
57      }
58  
59      final void soProducerIndex(long newValue) {
60          P_INDEX_UPDATER.lazySet(this, newValue);
61      }
62  
63      final boolean casProducerIndex(long expect, long newValue) {
64          return P_INDEX_UPDATER.compareAndSet(this, expect, newValue);
65      }
66  }
67  
68  /**
69   * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
70   * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
71   */
72  abstract class BaseMpscLinkedAtomicArrayQueuePad2<E> extends BaseMpscLinkedAtomicArrayQueueProducerFields<E> {
73  
74      long p01, p02, p03, p04, p05, p06, p07;
75  
76      long p10, p11, p12, p13, p14, p15, p16, p17;
77  }
78  
79  /**
80   * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
81   * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
82   */
83  abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields<E> extends BaseMpscLinkedAtomicArrayQueuePad2<E> {
84  
85      private static final AtomicLongFieldUpdater<BaseMpscLinkedAtomicArrayQueueConsumerFields> C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueConsumerFields.class, "consumerIndex");
86  
87      private volatile long consumerIndex;
88  
89      protected long consumerMask;
90  
91      protected AtomicReferenceArray<E> consumerBuffer;
92  
93      @Override
94      public final long lvConsumerIndex() {
95          return consumerIndex;
96      }
97  
98      final long lpConsumerIndex() {
99          return consumerIndex;
100     }
101 
102     final void soConsumerIndex(long newValue) {
103         C_INDEX_UPDATER.lazySet(this, newValue);
104     }
105 }
106 
107 /**
108  * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
109  * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
110  */
111 abstract class BaseMpscLinkedAtomicArrayQueuePad3<E> extends BaseMpscLinkedAtomicArrayQueueConsumerFields<E> {
112 
113     long p0, p1, p2, p3, p4, p5, p6, p7;
114 
115     long p10, p11, p12, p13, p14, p15, p16, p17;
116 }
117 
118 /**
119  * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
120  * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
121  */
122 abstract class BaseMpscLinkedAtomicArrayQueueColdProducerFields<E> extends BaseMpscLinkedAtomicArrayQueuePad3<E> {
123 
124     private static final AtomicLongFieldUpdater<BaseMpscLinkedAtomicArrayQueueColdProducerFields> P_LIMIT_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueColdProducerFields.class, "producerLimit");
125 
126     private volatile long producerLimit;
127 
128     protected long producerMask;
129 
130     protected AtomicReferenceArray<E> producerBuffer;
131 
132     final long lvProducerLimit() {
133         return producerLimit;
134     }
135 
136     final boolean casProducerLimit(long expect, long newValue) {
137         return P_LIMIT_UPDATER.compareAndSet(this, expect, newValue);
138     }
139 
140     final void soProducerLimit(long newValue) {
141         P_LIMIT_UPDATER.lazySet(this, newValue);
142     }
143 }
144 
145 /**
146  * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
147  * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
148  *
149  * An MPSC array queue which starts at <i>initialCapacity</i> and grows to <i>maxCapacity</i> in linked chunks
150  * of the initial size. The queue grows only when the current buffer is full and elements are not copied on
151  * resize, instead a link to the new buffer is stored in the old buffer for the consumer to follow.
152  */
153 abstract class BaseMpscLinkedAtomicArrayQueue<E> extends BaseMpscLinkedAtomicArrayQueueColdProducerFields<E> implements MessagePassingQueue<E>, QueueProgressIndicators {
154 
155     // No post padding here, subclasses must add
156     private static final Object JUMP = new Object();
157 
158     private static final Object BUFFER_CONSUMED = new Object();
159 
160     private static final int CONTINUE_TO_P_INDEX_CAS = 0;
161 
162     private static final int RETRY = 1;
163 
164     private static final int QUEUE_FULL = 2;
165 
166     private static final int QUEUE_RESIZE = 3;
167 
168     /**
169      * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size.
170      *                        Must be 2 or more.
171      */
172     public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) {
173         RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
174         int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
175         // leave lower bit of mask clear
176         long mask = (p2capacity - 1) << 1;
177         // need extra element to point at next array
178         AtomicReferenceArray<E> buffer = allocateRefArray(p2capacity + 1);
179         producerBuffer = buffer;
180         producerMask = mask;
181         consumerBuffer = buffer;
182         consumerMask = mask;
183         // we know it's all empty to start with
184         soProducerLimit(mask);
185     }
186 
187     @Override
188     public int size() {
189         // NOTE: because indices are on even numbers we cannot use the size util.
190         /*
191          * It is possible for a thread to be interrupted or reschedule between the read of the producer and
192          * consumer indices, therefore protection is required to ensure size is within valid range. In the
193          * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
194          * index BEFORE the producer index.
195          */
196         long after = lvConsumerIndex();
197         long size;
198         while (true) {
199             final long before = after;
200             final long currentProducerIndex = lvProducerIndex();
201             after = lvConsumerIndex();
202             if (before == after) {
203                 size = ((currentProducerIndex - after) >> 1);
204                 break;
205             }
206         }
207         // indexed queues.
208         if (size > Integer.MAX_VALUE) {
209             return Integer.MAX_VALUE;
210         } else {
211             return (int) size;
212         }
213     }
214 
215     @Override
216     public boolean isEmpty() {
217         // nothing we can do to make this an exact method.
218         return (this.lvConsumerIndex() == this.lvProducerIndex());
219     }
220 
221     @Override
222     public String toString() {
223         return this.getClass().getName();
224     }
225 
226     @Override
227     public boolean offer(final E e) {
228         if (null == e) {
229             throw new NullPointerException();
230         }
231         long mask;
232         AtomicReferenceArray<E> buffer;
233         long pIndex;
234         while (true) {
235             long producerLimit = lvProducerLimit();
236             pIndex = lvProducerIndex();
237             // lower bit is indicative of resize, if we see it we spin until it's cleared
238             if ((pIndex & 1) == 1) {
239                 continue;
240             }
241             // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
242             // mask/buffer may get changed by resizing -> only use for array access after successful CAS.
243             mask = this.producerMask;
244             buffer = this.producerBuffer;
245             // assumption behind this optimization is that queue is almost always empty or near empty
246             if (producerLimit <= pIndex) {
247                 int result = offerSlowPath(mask, pIndex, producerLimit);
248                 switch(result) {
249                     case CONTINUE_TO_P_INDEX_CAS:
250                         break;
251                     case RETRY:
252                         continue;
253                     case QUEUE_FULL:
254                         return false;
255                     case QUEUE_RESIZE:
256                         resize(mask, buffer, pIndex, e, null);
257                         return true;
258                 }
259             }
260             if (casProducerIndex(pIndex, pIndex + 2)) {
261                 break;
262             }
263         }
264         // INDEX visible before ELEMENT
265         final int offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
266         // release element e
267         soRefElement(buffer, offset, e);
268         return true;
269     }
270 
271     /**
272      * {@inheritDoc}
273      * <p>
274      * This implementation is correct for single consumer thread use only.
275      */
276     @SuppressWarnings("unchecked")
277     @Override
278     public E poll() {
279         final AtomicReferenceArray<E> buffer = consumerBuffer;
280         final long index = lpConsumerIndex();
281         final long mask = consumerMask;
282         final int offset = modifiedCalcCircularRefElementOffset(index, mask);
283         Object e = lvRefElement(buffer, offset);
284         if (e == null) {
285             if (index != lvProducerIndex()) {
286                 // visible.
287                 do {
288                     e = lvRefElement(buffer, offset);
289                 } while (e == null);
290             } else {
291                 return null;
292             }
293         }
294         if (e == JUMP) {
295             final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
296             return newBufferPoll(nextBuffer, index);
297         }
298         // release element null
299         soRefElement(buffer, offset, null);
300         // release cIndex
301         soConsumerIndex(index + 2);
302         return (E) e;
303     }
304 
305     /**
306      * {@inheritDoc}
307      * <p>
308      * This implementation is correct for single consumer thread use only.
309      */
310     @SuppressWarnings("unchecked")
311     @Override
312     public E peek() {
313         final AtomicReferenceArray<E> buffer = consumerBuffer;
314         final long index = lpConsumerIndex();
315         final long mask = consumerMask;
316         final int offset = modifiedCalcCircularRefElementOffset(index, mask);
317         Object e = lvRefElement(buffer, offset);
318         if (e == null && index != lvProducerIndex()) {
319             // check the producer index. If the queue is indeed not empty we spin until element is visible.
320             do {
321                 e = lvRefElement(buffer, offset);
322             } while (e == null);
323         }
324         if (e == JUMP) {
325             return newBufferPeek(nextBuffer(buffer, mask), index);
326         }
327         return (E) e;
328     }
329 
330     /**
331      * We do not inline resize into this method because we do not resize on fill.
332      */
333     private int offerSlowPath(long mask, long pIndex, long producerLimit) {
334         final long cIndex = lvConsumerIndex();
335         long bufferCapacity = getCurrentBufferCapacity(mask);
336         if (cIndex + bufferCapacity > pIndex) {
337             if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
338                 // retry from top
339                 return RETRY;
340             } else {
341                 // continue to pIndex CAS
342                 return CONTINUE_TO_P_INDEX_CAS;
343             }
344         } else // full and cannot grow
345         if (availableInQueue(pIndex, cIndex) <= 0) {
346             // offer should return false;
347             return QUEUE_FULL;
348         } else // grab index for resize -> set lower bit
349         if (casProducerIndex(pIndex, pIndex + 1)) {
350             // trigger a resize
351             return QUEUE_RESIZE;
352         } else {
353             // failed resize attempt, retry from top
354             return RETRY;
355         }
356     }
357 
358     /**
359      * @return available elements in queue * 2
360      */
361     protected abstract long availableInQueue(long pIndex, long cIndex);
362 
363     @SuppressWarnings("unchecked")
364     private AtomicReferenceArray<E> nextBuffer(final AtomicReferenceArray<E> buffer, final long mask) {
365         final int offset = nextArrayOffset(mask);
366         final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lvRefElement(buffer, offset);
367         consumerBuffer = nextBuffer;
368         consumerMask = (length(nextBuffer) - 2) << 1;
369         soRefElement(buffer, offset, BUFFER_CONSUMED);
370         return nextBuffer;
371     }
372 
373     private static int nextArrayOffset(long mask) {
374         return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE);
375     }
376 
377     private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long index) {
378         final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
379         final E n = lvRefElement(nextBuffer, offset);
380         if (n == null) {
381             throw new IllegalStateException("new buffer must have at least one element");
382         }
383         soRefElement(nextBuffer, offset, null);
384         soConsumerIndex(index + 2);
385         return n;
386     }
387 
388     private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long index) {
389         final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
390         final E n = lvRefElement(nextBuffer, offset);
391         if (null == n) {
392             throw new IllegalStateException("new buffer must have at least one element");
393         }
394         return n;
395     }
396 
397     @Override
398     public long currentProducerIndex() {
399         return lvProducerIndex() / 2;
400     }
401 
402     @Override
403     public long currentConsumerIndex() {
404         return lvConsumerIndex() / 2;
405     }
406 
407     @Override
408     public abstract int capacity();
409 
410     @Override
411     public boolean relaxedOffer(E e) {
412         return offer(e);
413     }
414 
415     @SuppressWarnings("unchecked")
416     @Override
417     public E relaxedPoll() {
418         final AtomicReferenceArray<E> buffer = consumerBuffer;
419         final long index = lpConsumerIndex();
420         final long mask = consumerMask;
421         final int offset = modifiedCalcCircularRefElementOffset(index, mask);
422         Object e = lvRefElement(buffer, offset);
423         if (e == null) {
424             return null;
425         }
426         if (e == JUMP) {
427             final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
428             return newBufferPoll(nextBuffer, index);
429         }
430         soRefElement(buffer, offset, null);
431         soConsumerIndex(index + 2);
432         return (E) e;
433     }
434 
435     @SuppressWarnings("unchecked")
436     @Override
437     public E relaxedPeek() {
438         final AtomicReferenceArray<E> buffer = consumerBuffer;
439         final long index = lpConsumerIndex();
440         final long mask = consumerMask;
441         final int offset = modifiedCalcCircularRefElementOffset(index, mask);
442         Object e = lvRefElement(buffer, offset);
443         if (e == JUMP) {
444             return newBufferPeek(nextBuffer(buffer, mask), index);
445         }
446         return (E) e;
447     }
448 
449     @Override
450     public int fill(Supplier<E> s) {
451         // result is a long because we want to have a safepoint check at regular intervals
452         long result = 0;
453         final int capacity = capacity();
454         do {
455             final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
456             if (filled == 0) {
457                 return (int) result;
458             }
459             result += filled;
460         } while (result <= capacity);
461         return (int) result;
462     }
463 
464     @Override
465     public int fill(Supplier<E> s, int limit) {
466         if (null == s)
467             throw new IllegalArgumentException("supplier is null");
468         if (limit < 0)
469             throw new IllegalArgumentException("limit is negative:" + limit);
470         if (limit == 0)
471             return 0;
472         long mask;
473         AtomicReferenceArray<E> buffer;
474         long pIndex;
475         int claimedSlots;
476         while (true) {
477             long producerLimit = lvProducerLimit();
478             pIndex = lvProducerIndex();
479             // lower bit is indicative of resize, if we see it we spin until it's cleared
480             if ((pIndex & 1) == 1) {
481                 continue;
482             }
483             // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
484             // NOTE: mask/buffer may get changed by resizing -> only use for array access after successful CAS.
485             // Only by virtue offloading them between the lvProducerIndex and a successful casProducerIndex are they
486             // safe to use.
487             mask = this.producerMask;
488             buffer = this.producerBuffer;
489             // a successful CAS ties the ordering, lv(pIndex) -> [mask/buffer] -> cas(pIndex)
490             // we want 'limit' slots, but will settle for whatever is visible to 'producerLimit'
491             // -> producerLimit >= batchIndex
492             long batchIndex = Math.min(producerLimit, pIndex + 2l * limit);
493             if (pIndex >= producerLimit) {
494                 int result = offerSlowPath(mask, pIndex, producerLimit);
495                 switch(result) {
496                     case CONTINUE_TO_P_INDEX_CAS:
497                     // offer slow path verifies only one slot ahead, we cannot rely on indication here
498                     case RETRY:
499                         continue;
500                     case QUEUE_FULL:
501                         return 0;
502                     case QUEUE_RESIZE:
503                         resize(mask, buffer, pIndex, null, s);
504                         return 1;
505                 }
506             }
507             // claim limit slots at once
508             if (casProducerIndex(pIndex, batchIndex)) {
509                 claimedSlots = (int) ((batchIndex - pIndex) / 2);
510                 break;
511             }
512         }
513         for (int i = 0; i < claimedSlots; i++) {
514             final int offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask);
515             soRefElement(buffer, offset, s.get());
516         }
517         return claimedSlots;
518     }
519 
520     @Override
521     public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit) {
522         MessagePassingQueueUtil.fill(this, s, wait, exit);
523     }
524 
525     @Override
526     public int drain(Consumer<E> c) {
527         return drain(c, capacity());
528     }
529 
530     @Override
531     public int drain(Consumer<E> c, int limit) {
532         return MessagePassingQueueUtil.drain(this, c, limit);
533     }
534 
535     @Override
536     public void drain(Consumer<E> c, WaitStrategy wait, ExitCondition exit) {
537         MessagePassingQueueUtil.drain(this, c, wait, exit);
538     }
539 
540     /**
541      * Get an iterator for this queue. This method is thread safe.
542      * <p>
543      * The iterator provides a best-effort snapshot of the elements in the queue.
544      * The returned iterator is not guaranteed to return elements in queue order,
545      * and races with the consumer thread may cause gaps in the sequence of returned elements.
546      * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
547      *
548      * @return The iterator.
549      */
550     @Override
551     public Iterator<E> iterator() {
552         return new WeakIterator(consumerBuffer, lvConsumerIndex(), lvProducerIndex());
553     }
554 
555     /**
556      * NOTE: This class was automatically generated by io.netty.util.internal.shaded.org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
557      * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
558      */
559     private static class WeakIterator<E> implements Iterator<E> {
560 
561         private final long pIndex;
562 
563         private long nextIndex;
564 
565         private E nextElement;
566 
567         private AtomicReferenceArray<E> currentBuffer;
568 
569         private int mask;
570 
571         WeakIterator(AtomicReferenceArray<E> currentBuffer, long cIndex, long pIndex) {
572             this.pIndex = pIndex >> 1;
573             this.nextIndex = cIndex >> 1;
574             setBuffer(currentBuffer);
575             nextElement = getNext();
576         }
577 
578         @Override
579         public void remove() {
580             throw new UnsupportedOperationException("remove");
581         }
582 
583         @Override
584         public boolean hasNext() {
585             return nextElement != null;
586         }
587 
588         @Override
589         public E next() {
590             final E e = nextElement;
591             if (e == null) {
592                 throw new NoSuchElementException();
593             }
594             nextElement = getNext();
595             return e;
596         }
597 
598         private void setBuffer(AtomicReferenceArray<E> buffer) {
599             this.currentBuffer = buffer;
600             this.mask = length(buffer) - 2;
601         }
602 
603         private E getNext() {
604             while (nextIndex < pIndex) {
605                 long index = nextIndex++;
606                 E e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
607                 // skip removed/not yet visible elements
608                 if (e == null) {
609                     continue;
610                 }
611                 // not null && not JUMP -> found next element
612                 if (e != JUMP) {
613                     return e;
614                 }
615                 // need to jump to the next buffer
616                 int nextBufferIndex = mask + 1;
617                 Object nextBuffer = lvRefElement(currentBuffer, calcRefElementOffset(nextBufferIndex));
618                 if (nextBuffer == BUFFER_CONSUMED || nextBuffer == null) {
619                     // Consumer may have passed us, or the next buffer is not visible yet: drop out early
620                     return null;
621                 }
622                 setBuffer((AtomicReferenceArray<E>) nextBuffer);
623                 // now with the new array retry the load, it can't be a JUMP, but we need to repeat same index
624                 e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
625                 // skip removed/not yet visible elements
626                 if (e == null) {
627                     continue;
628                 } else {
629                     return e;
630                 }
631             }
632             return null;
633         }
634     }
635 
636     private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex, E e, Supplier<E> s) {
637         assert (e != null && s == null) || (e == null || s != null);
638         int newBufferLength = getNextBufferSize(oldBuffer);
639         final AtomicReferenceArray<E> newBuffer;
640         try {
641             newBuffer = allocateRefArray(newBufferLength);
642         } catch (OutOfMemoryError oom) {
643             assert lvProducerIndex() == pIndex + 1;
644             soProducerIndex(pIndex);
645             throw oom;
646         }
647         producerBuffer = newBuffer;
648         final int newMask = (newBufferLength - 2) << 1;
649         producerMask = newMask;
650         final int offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
651         final int offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
652         // element in new array
653         soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
654         // buffer linked
655         soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
656         // ASSERT code
657         final long cIndex = lvConsumerIndex();
658         final long availableInQueue = availableInQueue(pIndex, cIndex);
659         RangeUtil.checkPositive(availableInQueue, "availableInQueue");
660         // Invalidate racing CASs
661         // We never set the limit beyond the bounds of a buffer
662         soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
663         // make resize visible to the other producers
664         soProducerIndex(pIndex + 2);
665         // INDEX visible before ELEMENT, consistent with consumer expectation
666         // make resize visible to consumer
667         soRefElement(oldBuffer, offsetInOld, JUMP);
668     }
669 
670     /**
671      * @return next buffer size(inclusive of next array pointer)
672      */
673     protected abstract int getNextBufferSize(AtomicReferenceArray<E> buffer);
674 
675     /**
676      * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2
677      */
678     protected abstract long getCurrentBufferCapacity(long mask);
679 }