1
2
3
4
5
6
7
8
9
10
11
12
13
14 package io.netty.util.internal.shaded.org.jctools.queues.atomic;
15
16 import io.netty.util.internal.shaded.org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
17 import io.netty.util.internal.shaded.org.jctools.util.PortableJvmInfo;
18 import io.netty.util.internal.shaded.org.jctools.util.Pow2;
19 import io.netty.util.internal.shaded.org.jctools.util.RangeUtil;
20 import java.util.AbstractQueue;
21 import java.util.Iterator;
22 import java.util.NoSuchElementException;
23 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
24 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
25 import java.util.concurrent.atomic.AtomicReferenceArray;
26 import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue;
27 import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueue.Supplier;
28 import io.netty.util.internal.shaded.org.jctools.queues.MessagePassingQueueUtil;
29 import io.netty.util.internal.shaded.org.jctools.queues.QueueProgressIndicators;
30 import io.netty.util.internal.shaded.org.jctools.queues.IndexedQueueSizeUtil;
31 import static io.netty.util.internal.shaded.org.jctools.queues.atomic.AtomicQueueUtil.*;
32
33
34
35
36
37 abstract class BaseMpscLinkedAtomicArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue {
38
39 long p01, p02, p03, p04, p05, p06, p07;
40
41 long p10, p11, p12, p13, p14, p15, p16, p17;
42 }
43
44
45
46
47
48 abstract class BaseMpscLinkedAtomicArrayQueueProducerFields<E> extends BaseMpscLinkedAtomicArrayQueuePad1<E> {
49
50 private static final AtomicLongFieldUpdater<BaseMpscLinkedAtomicArrayQueueProducerFields> P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueProducerFields.class, "producerIndex");
51
52 private volatile long producerIndex;
53
54 @Override
55 public final long lvProducerIndex() {
56 return producerIndex;
57 }
58
59 final void soProducerIndex(long newValue) {
60 P_INDEX_UPDATER.lazySet(this, newValue);
61 }
62
63 final boolean casProducerIndex(long expect, long newValue) {
64 return P_INDEX_UPDATER.compareAndSet(this, expect, newValue);
65 }
66 }
67
68
69
70
71
72 abstract class BaseMpscLinkedAtomicArrayQueuePad2<E> extends BaseMpscLinkedAtomicArrayQueueProducerFields<E> {
73
74 long p01, p02, p03, p04, p05, p06, p07;
75
76 long p10, p11, p12, p13, p14, p15, p16, p17;
77 }
78
79
80
81
82
83 abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields<E> extends BaseMpscLinkedAtomicArrayQueuePad2<E> {
84
85 private static final AtomicLongFieldUpdater<BaseMpscLinkedAtomicArrayQueueConsumerFields> C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueConsumerFields.class, "consumerIndex");
86
87 private volatile long consumerIndex;
88
89 protected long consumerMask;
90
91 protected AtomicReferenceArray<E> consumerBuffer;
92
93 @Override
94 public final long lvConsumerIndex() {
95 return consumerIndex;
96 }
97
98 final long lpConsumerIndex() {
99 return consumerIndex;
100 }
101
102 final void soConsumerIndex(long newValue) {
103 C_INDEX_UPDATER.lazySet(this, newValue);
104 }
105 }
106
107
108
109
110
111 abstract class BaseMpscLinkedAtomicArrayQueuePad3<E> extends BaseMpscLinkedAtomicArrayQueueConsumerFields<E> {
112
113 long p0, p1, p2, p3, p4, p5, p6, p7;
114
115 long p10, p11, p12, p13, p14, p15, p16, p17;
116 }
117
118
119
120
121
122 abstract class BaseMpscLinkedAtomicArrayQueueColdProducerFields<E> extends BaseMpscLinkedAtomicArrayQueuePad3<E> {
123
124 private static final AtomicLongFieldUpdater<BaseMpscLinkedAtomicArrayQueueColdProducerFields> P_LIMIT_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueColdProducerFields.class, "producerLimit");
125
126 private volatile long producerLimit;
127
128 protected long producerMask;
129
130 protected AtomicReferenceArray<E> producerBuffer;
131
132 final long lvProducerLimit() {
133 return producerLimit;
134 }
135
136 final boolean casProducerLimit(long expect, long newValue) {
137 return P_LIMIT_UPDATER.compareAndSet(this, expect, newValue);
138 }
139
140 final void soProducerLimit(long newValue) {
141 P_LIMIT_UPDATER.lazySet(this, newValue);
142 }
143 }
144
145
146
147
148
149
150
151
152
153 abstract class BaseMpscLinkedAtomicArrayQueue<E> extends BaseMpscLinkedAtomicArrayQueueColdProducerFields<E> implements MessagePassingQueue<E>, QueueProgressIndicators {
154
155
156 private static final Object JUMP = new Object();
157
158 private static final Object BUFFER_CONSUMED = new Object();
159
160 private static final int CONTINUE_TO_P_INDEX_CAS = 0;
161
162 private static final int RETRY = 1;
163
164 private static final int QUEUE_FULL = 2;
165
166 private static final int QUEUE_RESIZE = 3;
167
168
169
170
171
172 public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) {
173 RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");
174 int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
175
176 long mask = (p2capacity - 1) << 1;
177
178 AtomicReferenceArray<E> buffer = allocateRefArray(p2capacity + 1);
179 producerBuffer = buffer;
180 producerMask = mask;
181 consumerBuffer = buffer;
182 consumerMask = mask;
183
184 soProducerLimit(mask);
185 }
186
187 @Override
188 public int size() {
189
190
191
192
193
194
195
196 long after = lvConsumerIndex();
197 long size;
198 while (true) {
199 final long before = after;
200 final long currentProducerIndex = lvProducerIndex();
201 after = lvConsumerIndex();
202 if (before == after) {
203 size = ((currentProducerIndex - after) >> 1);
204 break;
205 }
206 }
207
208 if (size > Integer.MAX_VALUE) {
209 return Integer.MAX_VALUE;
210 } else {
211 return (int) size;
212 }
213 }
214
215 @Override
216 public boolean isEmpty() {
217
218 return (this.lvConsumerIndex() == this.lvProducerIndex());
219 }
220
221 @Override
222 public String toString() {
223 return this.getClass().getName();
224 }
225
226 @Override
227 public boolean offer(final E e) {
228 if (null == e) {
229 throw new NullPointerException();
230 }
231 long mask;
232 AtomicReferenceArray<E> buffer;
233 long pIndex;
234 while (true) {
235 long producerLimit = lvProducerLimit();
236 pIndex = lvProducerIndex();
237
238 if ((pIndex & 1) == 1) {
239 continue;
240 }
241
242
243 mask = this.producerMask;
244 buffer = this.producerBuffer;
245
246 if (producerLimit <= pIndex) {
247 int result = offerSlowPath(mask, pIndex, producerLimit);
248 switch(result) {
249 case CONTINUE_TO_P_INDEX_CAS:
250 break;
251 case RETRY:
252 continue;
253 case QUEUE_FULL:
254 return false;
255 case QUEUE_RESIZE:
256 resize(mask, buffer, pIndex, e, null);
257 return true;
258 }
259 }
260 if (casProducerIndex(pIndex, pIndex + 2)) {
261 break;
262 }
263 }
264
265 final int offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
266
267 soRefElement(buffer, offset, e);
268 return true;
269 }
270
271
272
273
274
275
276 @SuppressWarnings("unchecked")
277 @Override
278 public E poll() {
279 final AtomicReferenceArray<E> buffer = consumerBuffer;
280 final long index = lpConsumerIndex();
281 final long mask = consumerMask;
282 final int offset = modifiedCalcCircularRefElementOffset(index, mask);
283 Object e = lvRefElement(buffer, offset);
284 if (e == null) {
285 if (index != lvProducerIndex()) {
286
287 do {
288 e = lvRefElement(buffer, offset);
289 } while (e == null);
290 } else {
291 return null;
292 }
293 }
294 if (e == JUMP) {
295 final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
296 return newBufferPoll(nextBuffer, index);
297 }
298
299 soRefElement(buffer, offset, null);
300
301 soConsumerIndex(index + 2);
302 return (E) e;
303 }
304
305
306
307
308
309
310 @SuppressWarnings("unchecked")
311 @Override
312 public E peek() {
313 final AtomicReferenceArray<E> buffer = consumerBuffer;
314 final long index = lpConsumerIndex();
315 final long mask = consumerMask;
316 final int offset = modifiedCalcCircularRefElementOffset(index, mask);
317 Object e = lvRefElement(buffer, offset);
318 if (e == null && index != lvProducerIndex()) {
319
320 do {
321 e = lvRefElement(buffer, offset);
322 } while (e == null);
323 }
324 if (e == JUMP) {
325 return newBufferPeek(nextBuffer(buffer, mask), index);
326 }
327 return (E) e;
328 }
329
330
331
332
333 private int offerSlowPath(long mask, long pIndex, long producerLimit) {
334 final long cIndex = lvConsumerIndex();
335 long bufferCapacity = getCurrentBufferCapacity(mask);
336 if (cIndex + bufferCapacity > pIndex) {
337 if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
338
339 return RETRY;
340 } else {
341
342 return CONTINUE_TO_P_INDEX_CAS;
343 }
344 } else
345 if (availableInQueue(pIndex, cIndex) <= 0) {
346
347 return QUEUE_FULL;
348 } else
349 if (casProducerIndex(pIndex, pIndex + 1)) {
350
351 return QUEUE_RESIZE;
352 } else {
353
354 return RETRY;
355 }
356 }
357
358
359
360
361 protected abstract long availableInQueue(long pIndex, long cIndex);
362
363 @SuppressWarnings("unchecked")
364 private AtomicReferenceArray<E> nextBuffer(final AtomicReferenceArray<E> buffer, final long mask) {
365 final int offset = nextArrayOffset(mask);
366 final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lvRefElement(buffer, offset);
367 consumerBuffer = nextBuffer;
368 consumerMask = (length(nextBuffer) - 2) << 1;
369 soRefElement(buffer, offset, BUFFER_CONSUMED);
370 return nextBuffer;
371 }
372
373 private static int nextArrayOffset(long mask) {
374 return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE);
375 }
376
377 private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long index) {
378 final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
379 final E n = lvRefElement(nextBuffer, offset);
380 if (n == null) {
381 throw new IllegalStateException("new buffer must have at least one element");
382 }
383 soRefElement(nextBuffer, offset, null);
384 soConsumerIndex(index + 2);
385 return n;
386 }
387
388 private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long index) {
389 final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
390 final E n = lvRefElement(nextBuffer, offset);
391 if (null == n) {
392 throw new IllegalStateException("new buffer must have at least one element");
393 }
394 return n;
395 }
396
397 @Override
398 public long currentProducerIndex() {
399 return lvProducerIndex() / 2;
400 }
401
402 @Override
403 public long currentConsumerIndex() {
404 return lvConsumerIndex() / 2;
405 }
406
407 @Override
408 public abstract int capacity();
409
410 @Override
411 public boolean relaxedOffer(E e) {
412 return offer(e);
413 }
414
415 @SuppressWarnings("unchecked")
416 @Override
417 public E relaxedPoll() {
418 final AtomicReferenceArray<E> buffer = consumerBuffer;
419 final long index = lpConsumerIndex();
420 final long mask = consumerMask;
421 final int offset = modifiedCalcCircularRefElementOffset(index, mask);
422 Object e = lvRefElement(buffer, offset);
423 if (e == null) {
424 return null;
425 }
426 if (e == JUMP) {
427 final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
428 return newBufferPoll(nextBuffer, index);
429 }
430 soRefElement(buffer, offset, null);
431 soConsumerIndex(index + 2);
432 return (E) e;
433 }
434
435 @SuppressWarnings("unchecked")
436 @Override
437 public E relaxedPeek() {
438 final AtomicReferenceArray<E> buffer = consumerBuffer;
439 final long index = lpConsumerIndex();
440 final long mask = consumerMask;
441 final int offset = modifiedCalcCircularRefElementOffset(index, mask);
442 Object e = lvRefElement(buffer, offset);
443 if (e == JUMP) {
444 return newBufferPeek(nextBuffer(buffer, mask), index);
445 }
446 return (E) e;
447 }
448
449 @Override
450 public int fill(Supplier<E> s) {
451
452 long result = 0;
453 final int capacity = capacity();
454 do {
455 final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
456 if (filled == 0) {
457 return (int) result;
458 }
459 result += filled;
460 } while (result <= capacity);
461 return (int) result;
462 }
463
464 @Override
465 public int fill(Supplier<E> s, int limit) {
466 if (null == s)
467 throw new IllegalArgumentException("supplier is null");
468 if (limit < 0)
469 throw new IllegalArgumentException("limit is negative:" + limit);
470 if (limit == 0)
471 return 0;
472 long mask;
473 AtomicReferenceArray<E> buffer;
474 long pIndex;
475 int claimedSlots;
476 while (true) {
477 long producerLimit = lvProducerLimit();
478 pIndex = lvProducerIndex();
479
480 if ((pIndex & 1) == 1) {
481 continue;
482 }
483
484
485
486
487 mask = this.producerMask;
488 buffer = this.producerBuffer;
489
490
491
492 long batchIndex = Math.min(producerLimit, pIndex + 2l * limit);
493 if (pIndex >= producerLimit) {
494 int result = offerSlowPath(mask, pIndex, producerLimit);
495 switch(result) {
496 case CONTINUE_TO_P_INDEX_CAS:
497
498 case RETRY:
499 continue;
500 case QUEUE_FULL:
501 return 0;
502 case QUEUE_RESIZE:
503 resize(mask, buffer, pIndex, null, s);
504 return 1;
505 }
506 }
507
508 if (casProducerIndex(pIndex, batchIndex)) {
509 claimedSlots = (int) ((batchIndex - pIndex) / 2);
510 break;
511 }
512 }
513 for (int i = 0; i < claimedSlots; i++) {
514 final int offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask);
515 soRefElement(buffer, offset, s.get());
516 }
517 return claimedSlots;
518 }
519
520 @Override
521 public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit) {
522 MessagePassingQueueUtil.fill(this, s, wait, exit);
523 }
524
525 @Override
526 public int drain(Consumer<E> c) {
527 return drain(c, capacity());
528 }
529
530 @Override
531 public int drain(Consumer<E> c, int limit) {
532 return MessagePassingQueueUtil.drain(this, c, limit);
533 }
534
535 @Override
536 public void drain(Consumer<E> c, WaitStrategy wait, ExitCondition exit) {
537 MessagePassingQueueUtil.drain(this, c, wait, exit);
538 }
539
540
541
542
543
544
545
546
547
548
549
550 @Override
551 public Iterator<E> iterator() {
552 return new WeakIterator(consumerBuffer, lvConsumerIndex(), lvProducerIndex());
553 }
554
555
556
557
558
559 private static class WeakIterator<E> implements Iterator<E> {
560
561 private final long pIndex;
562
563 private long nextIndex;
564
565 private E nextElement;
566
567 private AtomicReferenceArray<E> currentBuffer;
568
569 private int mask;
570
571 WeakIterator(AtomicReferenceArray<E> currentBuffer, long cIndex, long pIndex) {
572 this.pIndex = pIndex >> 1;
573 this.nextIndex = cIndex >> 1;
574 setBuffer(currentBuffer);
575 nextElement = getNext();
576 }
577
578 @Override
579 public void remove() {
580 throw new UnsupportedOperationException("remove");
581 }
582
583 @Override
584 public boolean hasNext() {
585 return nextElement != null;
586 }
587
588 @Override
589 public E next() {
590 final E e = nextElement;
591 if (e == null) {
592 throw new NoSuchElementException();
593 }
594 nextElement = getNext();
595 return e;
596 }
597
598 private void setBuffer(AtomicReferenceArray<E> buffer) {
599 this.currentBuffer = buffer;
600 this.mask = length(buffer) - 2;
601 }
602
603 private E getNext() {
604 while (nextIndex < pIndex) {
605 long index = nextIndex++;
606 E e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
607
608 if (e == null) {
609 continue;
610 }
611
612 if (e != JUMP) {
613 return e;
614 }
615
616 int nextBufferIndex = mask + 1;
617 Object nextBuffer = lvRefElement(currentBuffer, calcRefElementOffset(nextBufferIndex));
618 if (nextBuffer == BUFFER_CONSUMED || nextBuffer == null) {
619
620 return null;
621 }
622 setBuffer((AtomicReferenceArray<E>) nextBuffer);
623
624 e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
625
626 if (e == null) {
627 continue;
628 } else {
629 return e;
630 }
631 }
632 return null;
633 }
634 }
635
636 private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex, E e, Supplier<E> s) {
637 assert (e != null && s == null) || (e == null || s != null);
638 int newBufferLength = getNextBufferSize(oldBuffer);
639 final AtomicReferenceArray<E> newBuffer;
640 try {
641 newBuffer = allocateRefArray(newBufferLength);
642 } catch (OutOfMemoryError oom) {
643 assert lvProducerIndex() == pIndex + 1;
644 soProducerIndex(pIndex);
645 throw oom;
646 }
647 producerBuffer = newBuffer;
648 final int newMask = (newBufferLength - 2) << 1;
649 producerMask = newMask;
650 final int offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
651 final int offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
652
653 soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
654
655 soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
656
657 final long cIndex = lvConsumerIndex();
658 final long availableInQueue = availableInQueue(pIndex, cIndex);
659 RangeUtil.checkPositive(availableInQueue, "availableInQueue");
660
661
662 soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
663
664 soProducerIndex(pIndex + 2);
665
666
667 soRefElement(oldBuffer, offsetInOld, JUMP);
668 }
669
670
671
672
673 protected abstract int getNextBufferSize(AtomicReferenceArray<E> buffer);
674
675
676
677
678 protected abstract long getCurrentBufferCapacity(long mask);
679 }