1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.traffic;
17
18 import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
19 import static io.netty.util.internal.ObjectUtil.checkPositive;
20 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
21
22 import io.netty.buffer.ByteBuf;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelHandler.Sharable;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.util.Attribute;
29 import io.netty.util.concurrent.EventExecutor;
30 import io.netty.util.internal.PlatformDependent;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.util.AbstractCollection;
35 import java.util.ArrayDeque;
36 import java.util.Collection;
37 import java.util.Iterator;
38 import java.util.concurrent.ConcurrentMap;
39 import java.util.concurrent.ScheduledExecutorService;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicLong;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 @Sharable
91 public class GlobalChannelTrafficShapingHandler extends AbstractTrafficShapingHandler {
92 private static final InternalLogger logger =
93 InternalLoggerFactory.getInstance(GlobalChannelTrafficShapingHandler.class);
94
95
96
97 final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
98
99
100
101
102 private final AtomicLong queuesSize = new AtomicLong();
103
104
105
106
107 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
108
109
110
111
112 private final AtomicLong cumulativeReadBytes = new AtomicLong();
113
114
115
116
117
118 volatile long maxGlobalWriteSize = DEFAULT_MAX_SIZE * 100;
119
120
121
122
123 private volatile long writeChannelLimit;
124
125
126
127
128 private volatile long readChannelLimit;
129
130 private static final float DEFAULT_DEVIATION = 0.1F;
131 private static final float MAX_DEVIATION = 0.4F;
132 private static final float DEFAULT_SLOWDOWN = 0.4F;
133 private static final float DEFAULT_ACCELERATION = -0.1F;
134 private volatile float maxDeviation;
135 private volatile float accelerationFactor;
136 private volatile float slowDownFactor;
137 private volatile boolean readDeviationActive;
138 private volatile boolean writeDeviationActive;
139
140 static final class PerChannel {
141 ArrayDeque<ToSend> messagesQueue;
142 TrafficCounter channelTrafficCounter;
143 long queueSize;
144 long lastWriteTimestamp;
145 long lastReadTimestamp;
146 }
147
148
149
150
151 void createGlobalTrafficCounter(ScheduledExecutorService executor) {
152
153 setMaxDeviation(DEFAULT_DEVIATION, DEFAULT_SLOWDOWN, DEFAULT_ACCELERATION);
154 checkNotNullWithIAE(executor, "executor");
155 TrafficCounter tc = new GlobalChannelTrafficCounter(this, executor, "GlobalChannelTC", checkInterval);
156 setTrafficCounter(tc);
157 tc.start();
158 }
159
160 @Override
161 protected int userDefinedWritabilityIndex() {
162 return AbstractTrafficShapingHandler.GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
163 }
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
185 long writeGlobalLimit, long readGlobalLimit,
186 long writeChannelLimit, long readChannelLimit,
187 long checkInterval, long maxTime) {
188 super(writeGlobalLimit, readGlobalLimit, checkInterval, maxTime);
189 createGlobalTrafficCounter(executor);
190 this.writeChannelLimit = writeChannelLimit;
191 this.readChannelLimit = readChannelLimit;
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
212 long writeGlobalLimit, long readGlobalLimit,
213 long writeChannelLimit, long readChannelLimit,
214 long checkInterval) {
215 super(writeGlobalLimit, readGlobalLimit, checkInterval);
216 this.writeChannelLimit = writeChannelLimit;
217 this.readChannelLimit = readChannelLimit;
218 createGlobalTrafficCounter(executor);
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
236 long writeGlobalLimit, long readGlobalLimit,
237 long writeChannelLimit, long readChannelLimit) {
238 super(writeGlobalLimit, readGlobalLimit);
239 this.writeChannelLimit = writeChannelLimit;
240 this.readChannelLimit = readChannelLimit;
241 createGlobalTrafficCounter(executor);
242 }
243
244
245
246
247
248
249
250
251
252
253 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor, long checkInterval) {
254 super(checkInterval);
255 createGlobalTrafficCounter(executor);
256 }
257
258
259
260
261
262
263
264 public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor) {
265 createGlobalTrafficCounter(executor);
266 }
267
268
269
270
271 public float maxDeviation() {
272 return maxDeviation;
273 }
274
275
276
277
278 public float accelerationFactor() {
279 return accelerationFactor;
280 }
281
282
283
284
285 public float slowDownFactor() {
286 return slowDownFactor;
287 }
288
289
290
291
292
293
294
295
296
297
298
299
300 public void setMaxDeviation(float maxDeviation, float slowDownFactor, float accelerationFactor) {
301 if (maxDeviation > MAX_DEVIATION) {
302 throw new IllegalArgumentException("maxDeviation must be <= " + MAX_DEVIATION);
303 }
304 checkPositiveOrZero(slowDownFactor, "slowDownFactor");
305 if (accelerationFactor > 0) {
306 throw new IllegalArgumentException("accelerationFactor must be <= 0");
307 }
308 this.maxDeviation = maxDeviation;
309 this.accelerationFactor = 1 + accelerationFactor;
310 this.slowDownFactor = 1 + slowDownFactor;
311 }
312
313 private void computeDeviationCumulativeBytes() {
314
315 long maxWrittenBytes = 0;
316 long maxReadBytes = 0;
317 long minWrittenBytes = Long.MAX_VALUE;
318 long minReadBytes = Long.MAX_VALUE;
319 for (PerChannel perChannel : channelQueues.values()) {
320 long value = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
321 if (maxWrittenBytes < value) {
322 maxWrittenBytes = value;
323 }
324 if (minWrittenBytes > value) {
325 minWrittenBytes = value;
326 }
327 value = perChannel.channelTrafficCounter.cumulativeReadBytes();
328 if (maxReadBytes < value) {
329 maxReadBytes = value;
330 }
331 if (minReadBytes > value) {
332 minReadBytes = value;
333 }
334 }
335 boolean multiple = channelQueues.size() > 1;
336 readDeviationActive = multiple && minReadBytes < maxReadBytes / 2;
337 writeDeviationActive = multiple && minWrittenBytes < maxWrittenBytes / 2;
338 cumulativeWrittenBytes.set(maxWrittenBytes);
339 cumulativeReadBytes.set(maxReadBytes);
340 }
341
342 @Override
343 protected void doAccounting(TrafficCounter counter) {
344 computeDeviationCumulativeBytes();
345 super.doAccounting(counter);
346 }
347
348 private long computeBalancedWait(float maxLocal, float maxGlobal, long wait) {
349 if (maxGlobal == 0) {
350
351 return wait;
352 }
353 float ratio = maxLocal / maxGlobal;
354
355 if (ratio > maxDeviation) {
356 if (ratio < 1 - maxDeviation) {
357 return wait;
358 } else {
359 ratio = slowDownFactor;
360 if (wait < MINIMAL_WAIT) {
361 wait = MINIMAL_WAIT;
362 }
363 }
364 } else {
365 ratio = accelerationFactor;
366 }
367 return (long) (wait * ratio);
368 }
369
370
371
372
373 public long getMaxGlobalWriteSize() {
374 return maxGlobalWriteSize;
375 }
376
377
378
379
380
381
382
383
384
385
386
387 public void setMaxGlobalWriteSize(long maxGlobalWriteSize) {
388 this.maxGlobalWriteSize = checkPositive(maxGlobalWriteSize, "maxGlobalWriteSize");
389 }
390
391
392
393
394 public long queuesSize() {
395 return queuesSize.get();
396 }
397
398
399
400
401
402 public void configureChannel(long newWriteLimit, long newReadLimit) {
403 writeChannelLimit = newWriteLimit;
404 readChannelLimit = newReadLimit;
405 long now = TrafficCounter.milliSecondFromNano();
406 for (PerChannel perChannel : channelQueues.values()) {
407 perChannel.channelTrafficCounter.resetAccounting(now);
408 }
409 }
410
411
412
413
414 public long getWriteChannelLimit() {
415 return writeChannelLimit;
416 }
417
418
419
420
421 public void setWriteChannelLimit(long writeLimit) {
422 writeChannelLimit = writeLimit;
423 long now = TrafficCounter.milliSecondFromNano();
424 for (PerChannel perChannel : channelQueues.values()) {
425 perChannel.channelTrafficCounter.resetAccounting(now);
426 }
427 }
428
429
430
431
432 public long getReadChannelLimit() {
433 return readChannelLimit;
434 }
435
436
437
438
439 public void setReadChannelLimit(long readLimit) {
440 readChannelLimit = readLimit;
441 long now = TrafficCounter.milliSecondFromNano();
442 for (PerChannel perChannel : channelQueues.values()) {
443 perChannel.channelTrafficCounter.resetAccounting(now);
444 }
445 }
446
447
448
449
450 public final void release() {
451 trafficCounter.stop();
452 }
453
454 private PerChannel getOrSetPerChannel(ChannelHandlerContext ctx) {
455
456 Channel channel = ctx.channel();
457 Integer key = channel.hashCode();
458 PerChannel perChannel = channelQueues.get(key);
459 if (perChannel == null) {
460 perChannel = new PerChannel();
461 perChannel.messagesQueue = new ArrayDeque<ToSend>();
462
463 perChannel.channelTrafficCounter = new TrafficCounter(this, null, "ChannelTC" +
464 ctx.channel().hashCode(), checkInterval);
465 perChannel.queueSize = 0L;
466 perChannel.lastReadTimestamp = TrafficCounter.milliSecondFromNano();
467 perChannel.lastWriteTimestamp = perChannel.lastReadTimestamp;
468 channelQueues.put(key, perChannel);
469 }
470 return perChannel;
471 }
472
473 @Override
474 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
475 getOrSetPerChannel(ctx);
476 trafficCounter.resetCumulativeTime();
477 super.handlerAdded(ctx);
478 }
479
480 @Override
481 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
482 trafficCounter.resetCumulativeTime();
483 Channel channel = ctx.channel();
484 Integer key = channel.hashCode();
485 PerChannel perChannel = channelQueues.remove(key);
486 if (perChannel != null) {
487
488 synchronized (perChannel) {
489 if (channel.isActive()) {
490 for (ToSend toSend : perChannel.messagesQueue) {
491 long size = calculateSize(toSend.toSend);
492 trafficCounter.bytesRealWriteFlowControl(size);
493 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
494 perChannel.queueSize -= size;
495 queuesSize.addAndGet(-size);
496 ctx.write(toSend.toSend, toSend.promise);
497 }
498 } else {
499 queuesSize.addAndGet(-perChannel.queueSize);
500 for (ToSend toSend : perChannel.messagesQueue) {
501 if (toSend.toSend instanceof ByteBuf) {
502 ((ByteBuf) toSend.toSend).release();
503 }
504 }
505 }
506 perChannel.messagesQueue.clear();
507 }
508 }
509 releaseWriteSuspended(ctx);
510 releaseReadSuspended(ctx);
511 super.handlerRemoved(ctx);
512 }
513
514 @Override
515 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
516 long size = calculateSize(msg);
517 long now = TrafficCounter.milliSecondFromNano();
518 if (size > 0) {
519
520 long waitGlobal = trafficCounter.readTimeToWait(size, getReadLimit(), maxTime, now);
521 Integer key = ctx.channel().hashCode();
522 PerChannel perChannel = channelQueues.get(key);
523 long wait = 0;
524 if (perChannel != null) {
525 wait = perChannel.channelTrafficCounter.readTimeToWait(size, readChannelLimit, maxTime, now);
526 if (readDeviationActive) {
527
528 long maxLocalRead;
529 maxLocalRead = perChannel.channelTrafficCounter.cumulativeReadBytes();
530 long maxGlobalRead = cumulativeReadBytes.get();
531 if (maxLocalRead <= 0) {
532 maxLocalRead = 0;
533 }
534 if (maxGlobalRead < maxLocalRead) {
535 maxGlobalRead = maxLocalRead;
536 }
537 wait = computeBalancedWait(maxLocalRead, maxGlobalRead, wait);
538 }
539 }
540 if (wait < waitGlobal) {
541 wait = waitGlobal;
542 }
543 wait = checkWaitReadTime(ctx, wait, now);
544 if (wait >= MINIMAL_WAIT) {
545
546
547 Channel channel = ctx.channel();
548 ChannelConfig config = channel.config();
549 if (logger.isDebugEnabled()) {
550 logger.debug("Read Suspend: " + wait + ':' + config.isAutoRead() + ':'
551 + isHandlerActive(ctx));
552 }
553 if (config.isAutoRead() && isHandlerActive(ctx)) {
554 config.setAutoRead(false);
555 channel.attr(READ_SUSPENDED).set(true);
556
557
558 Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
559 Runnable reopenTask = attr.get();
560 if (reopenTask == null) {
561 reopenTask = new ReopenReadTimerTask(ctx);
562 attr.set(reopenTask);
563 }
564 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
565 if (logger.isDebugEnabled()) {
566 logger.debug("Suspend final status => " + config.isAutoRead() + ':'
567 + isHandlerActive(ctx) + " will reopened at: " + wait);
568 }
569 }
570 }
571 }
572 informReadOperation(ctx, now);
573 ctx.fireChannelRead(msg);
574 }
575
576 @Override
577 protected long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
578 Integer key = ctx.channel().hashCode();
579 PerChannel perChannel = channelQueues.get(key);
580 if (perChannel != null) {
581 if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
582 wait = maxTime;
583 }
584 }
585 return wait;
586 }
587
588 @Override
589 protected void informReadOperation(final ChannelHandlerContext ctx, final long now) {
590 Integer key = ctx.channel().hashCode();
591 PerChannel perChannel = channelQueues.get(key);
592 if (perChannel != null) {
593 perChannel.lastReadTimestamp = now;
594 }
595 }
596
597 private static final class ToSend {
598 final long relativeTimeAction;
599 final Object toSend;
600 final ChannelPromise promise;
601 final long size;
602
603 private ToSend(final long delay, final Object toSend, final long size, final ChannelPromise promise) {
604 relativeTimeAction = delay;
605 this.toSend = toSend;
606 this.size = size;
607 this.promise = promise;
608 }
609 }
610
611 protected long maximumCumulativeWrittenBytes() {
612 return cumulativeWrittenBytes.get();
613 }
614
615 protected long maximumCumulativeReadBytes() {
616 return cumulativeReadBytes.get();
617 }
618
619
620
621
622
623 public Collection<TrafficCounter> channelTrafficCounters() {
624 return new AbstractCollection<TrafficCounter>() {
625 @Override
626 public Iterator<TrafficCounter> iterator() {
627 return new Iterator<TrafficCounter>() {
628 final Iterator<PerChannel> iter = channelQueues.values().iterator();
629 @Override
630 public boolean hasNext() {
631 return iter.hasNext();
632 }
633 @Override
634 public TrafficCounter next() {
635 return iter.next().channelTrafficCounter;
636 }
637 @Override
638 public void remove() {
639 throw new UnsupportedOperationException();
640 }
641 };
642 }
643 @Override
644 public int size() {
645 return channelQueues.size();
646 }
647 };
648 }
649
650 @Override
651 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
652 throws Exception {
653 long size = calculateSize(msg);
654 long now = TrafficCounter.milliSecondFromNano();
655 if (size > 0) {
656
657 long waitGlobal = trafficCounter.writeTimeToWait(size, getWriteLimit(), maxTime, now);
658 Integer key = ctx.channel().hashCode();
659 PerChannel perChannel = channelQueues.get(key);
660 long wait = 0;
661 if (perChannel != null) {
662 wait = perChannel.channelTrafficCounter.writeTimeToWait(size, writeChannelLimit, maxTime, now);
663 if (writeDeviationActive) {
664
665 long maxLocalWrite;
666 maxLocalWrite = perChannel.channelTrafficCounter.cumulativeWrittenBytes();
667 long maxGlobalWrite = cumulativeWrittenBytes.get();
668 if (maxLocalWrite <= 0) {
669 maxLocalWrite = 0;
670 }
671 if (maxGlobalWrite < maxLocalWrite) {
672 maxGlobalWrite = maxLocalWrite;
673 }
674 wait = computeBalancedWait(maxLocalWrite, maxGlobalWrite, wait);
675 }
676 }
677 if (wait < waitGlobal) {
678 wait = waitGlobal;
679 }
680 if (wait >= MINIMAL_WAIT) {
681 if (logger.isDebugEnabled()) {
682 logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
683 + isHandlerActive(ctx));
684 }
685 submitWrite(ctx, msg, size, wait, now, promise);
686 return;
687 }
688 }
689
690 submitWrite(ctx, msg, size, 0, now, promise);
691 }
692
693 @Override
694 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
695 final long size, final long writedelay, final long now,
696 final ChannelPromise promise) {
697 Channel channel = ctx.channel();
698 Integer key = channel.hashCode();
699 PerChannel perChannel = channelQueues.get(key);
700 if (perChannel == null) {
701
702
703 perChannel = getOrSetPerChannel(ctx);
704 }
705 final ToSend newToSend;
706 long delay = writedelay;
707 boolean globalSizeExceeded = false;
708
709 synchronized (perChannel) {
710 if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
711 trafficCounter.bytesRealWriteFlowControl(size);
712 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
713 ctx.write(msg, promise);
714 perChannel.lastWriteTimestamp = now;
715 return;
716 }
717 if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
718 delay = maxTime;
719 }
720 newToSend = new ToSend(delay + now, msg, size, promise);
721 perChannel.messagesQueue.addLast(newToSend);
722 perChannel.queueSize += size;
723 queuesSize.addAndGet(size);
724 checkWriteSuspend(ctx, delay, perChannel.queueSize);
725 if (queuesSize.get() > maxGlobalWriteSize) {
726 globalSizeExceeded = true;
727 }
728 }
729 if (globalSizeExceeded) {
730 setUserDefinedWritability(ctx, false);
731 }
732 final long futureNow = newToSend.relativeTimeAction;
733 final PerChannel forSchedule = perChannel;
734 ctx.executor().schedule(new Runnable() {
735 @Override
736 public void run() {
737 sendAllValid(ctx, forSchedule, futureNow);
738 }
739 }, delay, TimeUnit.MILLISECONDS);
740 }
741
742 private void sendAllValid(final ChannelHandlerContext ctx, final PerChannel perChannel, final long now) {
743
744 synchronized (perChannel) {
745 ToSend newToSend = perChannel.messagesQueue.pollFirst();
746 for (; newToSend != null; newToSend = perChannel.messagesQueue.pollFirst()) {
747 if (newToSend.relativeTimeAction <= now) {
748 long size = newToSend.size;
749 trafficCounter.bytesRealWriteFlowControl(size);
750 perChannel.channelTrafficCounter.bytesRealWriteFlowControl(size);
751 perChannel.queueSize -= size;
752 queuesSize.addAndGet(-size);
753 ctx.write(newToSend.toSend, newToSend.promise);
754 perChannel.lastWriteTimestamp = now;
755 } else {
756 perChannel.messagesQueue.addFirst(newToSend);
757 break;
758 }
759 }
760 if (perChannel.messagesQueue.isEmpty()) {
761 releaseWriteSuspended(ctx);
762 }
763 }
764 ctx.flush();
765 }
766
767 @Override
768 public String toString() {
769 return new StringBuilder(340).append(super.toString())
770 .append(" Write Channel Limit: ").append(writeChannelLimit)
771 .append(" Read Channel Limit: ").append(readChannelLimit).toString();
772 }
773 }