1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.AbstractReferenceCountedByteBuf;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufHolder;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.socket.nio.NioSocketChannel;
23 import io.netty.util.Recycler.EnhancedHandle;
24 import io.netty.util.ReferenceCountUtil;
25 import io.netty.util.concurrent.FastThreadLocal;
26 import io.netty.util.internal.InternalThreadLocalMap;
27 import io.netty.util.internal.ObjectPool;
28 import io.netty.util.internal.ObjectPool.Handle;
29 import io.netty.util.internal.ObjectPool.ObjectCreator;
30 import io.netty.util.internal.ObjectUtil;
31 import io.netty.util.internal.PromiseNotificationUtil;
32 import io.netty.util.internal.SystemPropertyUtil;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.nio.ByteBuffer;
37 import java.nio.channels.ClosedChannelException;
38 import java.util.Arrays;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
41
42 import static java.lang.Math.min;
43
44
45
46
47
48
49
50
51
52
53
54
55 public final class ChannelOutboundBuffer {
56
57
58
59
60
61
62
63 static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
64 SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
65
66 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
67
68 private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
69 @Override
70 protected ByteBuffer[] initialValue() throws Exception {
71 return new ByteBuffer[1024];
72 }
73 };
74
75 private final Channel channel;
76
77
78
79
80 private Entry flushedEntry;
81
82 private Entry unflushedEntry;
83
84 private Entry tailEntry;
85
86 private int flushed;
87
88 private int nioBufferCount;
89 private long nioBufferSize;
90
91 private boolean inFail;
92
93 private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
94 AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
95
96 @SuppressWarnings("UnusedDeclaration")
97 private volatile long totalPendingSize;
98
99 private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
100 AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
101
102 @SuppressWarnings("UnusedDeclaration")
103 private volatile int unwritable;
104
105 private volatile Runnable fireChannelWritabilityChangedTask;
106
107 ChannelOutboundBuffer(AbstractChannel channel) {
108 this.channel = channel;
109 }
110
111
112
113
114
115 public void addMessage(Object msg, int size, ChannelPromise promise) {
116 Entry entry = Entry.newInstance(msg, size, total(msg), promise);
117 if (tailEntry == null) {
118 flushedEntry = null;
119 } else {
120 Entry tail = tailEntry;
121 tail.next = entry;
122 }
123 tailEntry = entry;
124 if (unflushedEntry == null) {
125 unflushedEntry = entry;
126 }
127
128
129
130
131
132 if (msg instanceof AbstractReferenceCountedByteBuf) {
133 ((AbstractReferenceCountedByteBuf) msg).touch();
134 } else {
135 ReferenceCountUtil.touch(msg);
136 }
137
138
139
140 incrementPendingOutboundBytes(entry.pendingSize, false);
141 }
142
143
144
145
146
147 public void addFlush() {
148
149
150
151
152 Entry entry = unflushedEntry;
153 if (entry != null) {
154 if (flushedEntry == null) {
155
156 flushedEntry = entry;
157 }
158 do {
159 flushed ++;
160 if (!entry.promise.setUncancellable()) {
161
162 int pending = entry.cancel();
163 decrementPendingOutboundBytes(pending, false, true);
164 }
165 entry = entry.next;
166 } while (entry != null);
167
168
169 unflushedEntry = null;
170 }
171 }
172
173
174
175
176
177 void incrementPendingOutboundBytes(long size) {
178 incrementPendingOutboundBytes(size, true);
179 }
180
181 private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
182 if (size == 0) {
183 return;
184 }
185
186 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
187 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
188 setUnwritable(invokeLater);
189 }
190 }
191
192
193
194
195
196 void decrementPendingOutboundBytes(long size) {
197 decrementPendingOutboundBytes(size, true, true);
198 }
199
200 private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
201 if (size == 0) {
202 return;
203 }
204
205 long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
206 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
207 setWritable(invokeLater);
208 }
209 }
210
211 private static long total(Object msg) {
212 if (msg instanceof ByteBuf) {
213 return ((ByteBuf) msg).readableBytes();
214 }
215 if (msg instanceof FileRegion) {
216 return ((FileRegion) msg).count();
217 }
218 if (msg instanceof ByteBufHolder) {
219 return ((ByteBufHolder) msg).content().readableBytes();
220 }
221 return -1;
222 }
223
224
225
226
227 public Object current() {
228 Entry entry = flushedEntry;
229 if (entry == null) {
230 return null;
231 }
232
233 return entry.msg;
234 }
235
236
237
238
239
240 public long currentProgress() {
241 Entry entry = flushedEntry;
242 if (entry == null) {
243 return 0;
244 }
245 return entry.progress;
246 }
247
248
249
250
251 public void progress(long amount) {
252 Entry e = flushedEntry;
253 assert e != null;
254 ChannelPromise p = e.promise;
255 long progress = e.progress + amount;
256 e.progress = progress;
257 assert p != null;
258 final Class<?> promiseClass = p.getClass();
259
260 if (promiseClass == VoidChannelPromise.class || promiseClass == DefaultChannelPromise.class) {
261 return;
262 }
263
264 if (p instanceof DefaultChannelProgressivePromise) {
265 ((DefaultChannelProgressivePromise) p).tryProgress(progress, e.total);
266 } else if (p instanceof ChannelProgressivePromise) {
267 ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
268 }
269 }
270
271
272
273
274
275
276 public boolean remove() {
277 Entry e = flushedEntry;
278 if (e == null) {
279 clearNioBuffers();
280 return false;
281 }
282 Object msg = e.msg;
283
284 ChannelPromise promise = e.promise;
285 int size = e.pendingSize;
286
287 removeEntry(e);
288
289
290 if (!e.cancelled) {
291
292
293 if (msg instanceof AbstractReferenceCountedByteBuf) {
294 try {
295
296 ((AbstractReferenceCountedByteBuf) msg).release();
297 } catch (Throwable t) {
298 logger.warn("Failed to release a ByteBuf: {}", msg, t);
299 }
300 } else {
301 ReferenceCountUtil.safeRelease(msg);
302 }
303 safeSuccess(promise);
304 decrementPendingOutboundBytes(size, false, true);
305 }
306
307
308 e.unguardedRecycle();
309
310 return true;
311 }
312
313
314
315
316
317
318 public boolean remove(Throwable cause) {
319 return remove0(cause, true);
320 }
321
322 private boolean remove0(Throwable cause, boolean notifyWritability) {
323 Entry e = flushedEntry;
324 if (e == null) {
325 clearNioBuffers();
326 return false;
327 }
328 Object msg = e.msg;
329
330 ChannelPromise promise = e.promise;
331 int size = e.pendingSize;
332
333 removeEntry(e);
334
335 if (!e.cancelled) {
336
337 ReferenceCountUtil.safeRelease(msg);
338
339 safeFail(promise, cause);
340 decrementPendingOutboundBytes(size, false, notifyWritability);
341 }
342
343
344 e.unguardedRecycle();
345
346 return true;
347 }
348
349 private void removeEntry(Entry e) {
350 if (-- flushed == 0) {
351
352 flushedEntry = null;
353 if (e == tailEntry) {
354 tailEntry = null;
355 unflushedEntry = null;
356 }
357 } else {
358 flushedEntry = e.next;
359 }
360 }
361
362
363
364
365
366 public void removeBytes(long writtenBytes) {
367 for (;;) {
368 Object msg = current();
369 if (!(msg instanceof ByteBuf)) {
370 assert writtenBytes == 0;
371 break;
372 }
373
374 final ByteBuf buf = (ByteBuf) msg;
375 final int readerIndex = buf.readerIndex();
376 final int readableBytes = buf.writerIndex() - readerIndex;
377
378 if (readableBytes <= writtenBytes) {
379 if (writtenBytes != 0) {
380 progress(readableBytes);
381 writtenBytes -= readableBytes;
382 }
383 remove();
384 } else {
385 if (writtenBytes != 0) {
386 buf.readerIndex(readerIndex + (int) writtenBytes);
387 progress(writtenBytes);
388 }
389 break;
390 }
391 }
392 clearNioBuffers();
393 }
394
395
396
397 private void clearNioBuffers() {
398 int count = nioBufferCount;
399 if (count > 0) {
400 nioBufferCount = 0;
401 Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
402 }
403 }
404
405
406
407
408
409
410
411
412
413
414
415 public ByteBuffer[] nioBuffers() {
416 return nioBuffers(Integer.MAX_VALUE, Integer.MAX_VALUE);
417 }
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433 public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
434 assert maxCount > 0;
435 assert maxBytes > 0;
436 long nioBufferSize = 0;
437 int nioBufferCount = 0;
438 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
439 ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
440 Entry entry = flushedEntry;
441 while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
442 if (!entry.cancelled) {
443 ByteBuf buf = (ByteBuf) entry.msg;
444 final int readerIndex = buf.readerIndex();
445 final int readableBytes = buf.writerIndex() - readerIndex;
446
447 if (readableBytes > 0) {
448 if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
449
450
451
452
453
454
455
456
457
458
459
460 break;
461 }
462 nioBufferSize += readableBytes;
463 int count = entry.count;
464 if (count == -1) {
465
466 entry.count = count = buf.nioBufferCount();
467 }
468 int neededSpace = min(maxCount, nioBufferCount + count);
469 if (neededSpace > nioBuffers.length) {
470 nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
471 NIO_BUFFERS.set(threadLocalMap, nioBuffers);
472 }
473 if (count == 1) {
474 ByteBuffer nioBuf = entry.buf;
475 if (nioBuf == null) {
476
477
478 entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
479 }
480 nioBuffers[nioBufferCount++] = nioBuf;
481 } else {
482
483
484 nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
485 }
486 if (nioBufferCount >= maxCount) {
487 break;
488 }
489 }
490 }
491 entry = entry.next;
492 }
493 this.nioBufferCount = nioBufferCount;
494 this.nioBufferSize = nioBufferSize;
495
496 return nioBuffers;
497 }
498
499 private static int nioBuffers(Entry entry, ByteBuf buf, ByteBuffer[] nioBuffers, int nioBufferCount, int maxCount) {
500 ByteBuffer[] nioBufs = entry.bufs;
501 if (nioBufs == null) {
502
503
504 entry.bufs = nioBufs = buf.nioBuffers();
505 }
506 for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
507 ByteBuffer nioBuf = nioBufs[i];
508 if (nioBuf == null) {
509 break;
510 } else if (!nioBuf.hasRemaining()) {
511 continue;
512 }
513 nioBuffers[nioBufferCount++] = nioBuf;
514 }
515 return nioBufferCount;
516 }
517
518 private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, int neededSpace, int size) {
519 int newCapacity = array.length;
520 do {
521
522
523 newCapacity <<= 1;
524
525 if (newCapacity < 0) {
526 throw new IllegalStateException();
527 }
528
529 } while (neededSpace > newCapacity);
530
531 ByteBuffer[] newArray = new ByteBuffer[newCapacity];
532 System.arraycopy(array, 0, newArray, 0, size);
533
534 return newArray;
535 }
536
537
538
539
540
541
542 public int nioBufferCount() {
543 return nioBufferCount;
544 }
545
546
547
548
549
550
551 public long nioBufferSize() {
552 return nioBufferSize;
553 }
554
555
556
557
558
559
560
561 public boolean isWritable() {
562 return unwritable == 0;
563 }
564
565
566
567
568
569 public boolean getUserDefinedWritability(int index) {
570 return (unwritable & writabilityMask(index)) == 0;
571 }
572
573
574
575
576 public void setUserDefinedWritability(int index, boolean writable) {
577 if (writable) {
578 setUserDefinedWritability(index);
579 } else {
580 clearUserDefinedWritability(index);
581 }
582 }
583
584 private void setUserDefinedWritability(int index) {
585 final int mask = ~writabilityMask(index);
586 for (;;) {
587 final int oldValue = unwritable;
588 final int newValue = oldValue & mask;
589 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
590 if (oldValue != 0 && newValue == 0) {
591 fireChannelWritabilityChanged(true);
592 }
593 break;
594 }
595 }
596 }
597
598 private void clearUserDefinedWritability(int index) {
599 final int mask = writabilityMask(index);
600 for (;;) {
601 final int oldValue = unwritable;
602 final int newValue = oldValue | mask;
603 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
604 if (oldValue == 0 && newValue != 0) {
605 fireChannelWritabilityChanged(true);
606 }
607 break;
608 }
609 }
610 }
611
612 private static int writabilityMask(int index) {
613 if (index < 1 || index > 31) {
614 throw new IllegalArgumentException("index: " + index + " (expected: 1~31)");
615 }
616 return 1 << index;
617 }
618
619 private void setWritable(boolean invokeLater) {
620 for (;;) {
621 final int oldValue = unwritable;
622 final int newValue = oldValue & ~1;
623 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
624 if (oldValue != 0 && newValue == 0) {
625 fireChannelWritabilityChanged(invokeLater);
626 }
627 break;
628 }
629 }
630 }
631
632 private void setUnwritable(boolean invokeLater) {
633 for (;;) {
634 final int oldValue = unwritable;
635 final int newValue = oldValue | 1;
636 if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
637 if (oldValue == 0) {
638 fireChannelWritabilityChanged(invokeLater);
639 }
640 break;
641 }
642 }
643 }
644
645 private void fireChannelWritabilityChanged(boolean invokeLater) {
646 final ChannelPipeline pipeline = channel.pipeline();
647 if (invokeLater) {
648 Runnable task = fireChannelWritabilityChangedTask;
649 if (task == null) {
650 fireChannelWritabilityChangedTask = task = new Runnable() {
651 @Override
652 public void run() {
653 pipeline.fireChannelWritabilityChanged();
654 }
655 };
656 }
657 channel.eventLoop().execute(task);
658 } else {
659 pipeline.fireChannelWritabilityChanged();
660 }
661 }
662
663
664
665
666 public int size() {
667 return flushed;
668 }
669
670
671
672
673
674 public boolean isEmpty() {
675 return flushed == 0;
676 }
677
678 void failFlushed(Throwable cause, boolean notify) {
679
680
681
682
683
684 if (inFail) {
685 return;
686 }
687
688 try {
689 inFail = true;
690 for (;;) {
691 if (!remove0(cause, notify)) {
692 break;
693 }
694 }
695 } finally {
696 inFail = false;
697 }
698 }
699
700 void close(final Throwable cause, final boolean allowChannelOpen) {
701 if (inFail) {
702 channel.eventLoop().execute(new Runnable() {
703 @Override
704 public void run() {
705 close(cause, allowChannelOpen);
706 }
707 });
708 return;
709 }
710
711 inFail = true;
712
713 if (!allowChannelOpen && channel.isOpen()) {
714 throw new IllegalStateException("close() must be invoked after the channel is closed.");
715 }
716
717 if (!isEmpty()) {
718 throw new IllegalStateException("close() must be invoked after all flushed writes are handled.");
719 }
720
721
722 try {
723 Entry e = unflushedEntry;
724 while (e != null) {
725
726 int size = e.pendingSize;
727 TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
728
729 if (!e.cancelled) {
730 ReferenceCountUtil.safeRelease(e.msg);
731 safeFail(e.promise, cause);
732 }
733 e = e.unguardedRecycleAndGetNext();
734 }
735 } finally {
736 inFail = false;
737 }
738 clearNioBuffers();
739 }
740
741 void close(ClosedChannelException cause) {
742 close(cause, false);
743 }
744
745 private static void safeSuccess(ChannelPromise promise) {
746
747
748 PromiseNotificationUtil.trySuccess(promise, null, promise instanceof VoidChannelPromise ? null : logger);
749 }
750
751 private static void safeFail(ChannelPromise promise, Throwable cause) {
752
753
754 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
755 }
756
757 @Deprecated
758 public void recycle() {
759
760 }
761
762 public long totalPendingWriteBytes() {
763 return totalPendingSize;
764 }
765
766
767
768
769
770 public long bytesBeforeUnwritable() {
771
772 long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize + 1;
773
774
775
776 return bytes > 0 && isWritable() ? bytes : 0;
777 }
778
779
780
781
782
783 public long bytesBeforeWritable() {
784
785 long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark() + 1;
786
787
788
789 return bytes <= 0 || isWritable() ? 0 : bytes;
790 }
791
792
793
794
795
796
797 public void forEachFlushedMessage(MessageProcessor processor) throws Exception {
798 ObjectUtil.checkNotNull(processor, "processor");
799
800 Entry entry = flushedEntry;
801 if (entry == null) {
802 return;
803 }
804
805 do {
806 if (!entry.cancelled) {
807 if (!processor.processMessage(entry.msg)) {
808 return;
809 }
810 }
811 entry = entry.next;
812 } while (isFlushedEntry(entry));
813 }
814
815 private boolean isFlushedEntry(Entry e) {
816 return e != null && e != unflushedEntry;
817 }
818
819 public interface MessageProcessor {
820
821
822
823
824 boolean processMessage(Object msg) throws Exception;
825 }
826
827 static final class Entry {
828 private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
829 @Override
830 public Entry newObject(Handle<Entry> handle) {
831 return new Entry(handle);
832 }
833 });
834
835 private final EnhancedHandle<Entry> handle;
836 Entry next;
837 Object msg;
838 ByteBuffer[] bufs;
839 ByteBuffer buf;
840 ChannelPromise promise;
841 long progress;
842 long total;
843 int pendingSize;
844 int count = -1;
845 boolean cancelled;
846
847 private Entry(Handle<Entry> handle) {
848 this.handle = (EnhancedHandle<Entry>) handle;
849 }
850
851 static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
852 Entry entry = RECYCLER.get();
853 entry.msg = msg;
854 entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
855 entry.total = total;
856 entry.promise = promise;
857 return entry;
858 }
859
860 int cancel() {
861 if (!cancelled) {
862 cancelled = true;
863 int pSize = pendingSize;
864
865
866 ReferenceCountUtil.safeRelease(msg);
867 msg = Unpooled.EMPTY_BUFFER;
868
869 pendingSize = 0;
870 total = 0;
871 progress = 0;
872 bufs = null;
873 buf = null;
874 return pSize;
875 }
876 return 0;
877 }
878
879 void unguardedRecycle() {
880 next = null;
881 bufs = null;
882 buf = null;
883 msg = null;
884 promise = null;
885 progress = 0;
886 total = 0;
887 pendingSize = 0;
888 count = -1;
889 cancelled = false;
890 handle.unguardedRecycle(this);
891 }
892
893 Entry unguardedRecycleAndGetNext() {
894 Entry next = this.next;
895 unguardedRecycle();
896 return next;
897 }
898 }
899 }