1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.traffic;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufHolder;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelDuplexHandler;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.channel.ChannelOutboundBuffer;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.FileRegion;
29 import io.netty.util.Attribute;
30 import io.netty.util.AttributeKey;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.util.concurrent.TimeUnit;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public abstract class AbstractTrafficShapingHandler extends ChannelDuplexHandler {
53 private static final InternalLogger logger =
54 InternalLoggerFactory.getInstance(AbstractTrafficShapingHandler.class);
55
56
57
58 public static final long DEFAULT_CHECK_INTERVAL = 1000;
59
60
61
62
63
64
65 public static final long DEFAULT_MAX_TIME = 15000;
66
67
68
69
70 static final long DEFAULT_MAX_SIZE = 4 * 1024 * 1024L;
71
72
73
74
75 static final long MINIMAL_WAIT = 10;
76
77
78
79
80 protected TrafficCounter trafficCounter;
81
82
83
84
85 private volatile long writeLimit;
86
87
88
89
90 private volatile long readLimit;
91
92
93
94
95 protected volatile long maxTime = DEFAULT_MAX_TIME;
96
97
98
99
100 protected volatile long checkInterval = DEFAULT_CHECK_INTERVAL;
101
102 static final AttributeKey<Boolean> READ_SUSPENDED = AttributeKey
103 .valueOf(AbstractTrafficShapingHandler.class.getName() + ".READ_SUSPENDED");
104 static final AttributeKey<Runnable> REOPEN_TASK = AttributeKey.valueOf(AbstractTrafficShapingHandler.class
105 .getName() + ".REOPEN_TASK");
106
107
108
109
110 volatile long maxWriteDelay = 4 * DEFAULT_CHECK_INTERVAL;
111
112
113
114 volatile long maxWriteSize = DEFAULT_MAX_SIZE;
115
116
117
118
119
120 final int userDefinedWritabilityIndex;
121
122
123
124
125 static final int CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 1;
126
127
128
129
130 static final int GLOBAL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 2;
131
132
133
134
135 static final int GLOBALCHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX = 3;
136
137
138
139
140
141 void setTrafficCounter(TrafficCounter newTrafficCounter) {
142 trafficCounter = newTrafficCounter;
143 }
144
145
146
147
148
149
150
151
152 protected int userDefinedWritabilityIndex() {
153 return CHANNEL_DEFAULT_USER_DEFINED_WRITABILITY_INDEX;
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167
168 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval, long maxTime) {
169 this.maxTime = checkPositive(maxTime, "maxTime");
170
171 userDefinedWritabilityIndex = userDefinedWritabilityIndex();
172 this.writeLimit = writeLimit;
173 this.readLimit = readLimit;
174 this.checkInterval = checkInterval;
175 }
176
177
178
179
180
181
182
183
184
185
186
187 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit, long checkInterval) {
188 this(writeLimit, readLimit, checkInterval, DEFAULT_MAX_TIME);
189 }
190
191
192
193
194
195
196
197
198
199
200 protected AbstractTrafficShapingHandler(long writeLimit, long readLimit) {
201 this(writeLimit, readLimit, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
202 }
203
204
205
206
207
208 protected AbstractTrafficShapingHandler() {
209 this(0, 0, DEFAULT_CHECK_INTERVAL, DEFAULT_MAX_TIME);
210 }
211
212
213
214
215
216
217
218
219
220 protected AbstractTrafficShapingHandler(long checkInterval) {
221 this(0, 0, checkInterval, DEFAULT_MAX_TIME);
222 }
223
224
225
226
227
228
229
230
231
232
233
234
235
236 public void configure(long newWriteLimit, long newReadLimit,
237 long newCheckInterval) {
238 configure(newWriteLimit, newReadLimit);
239 configure(newCheckInterval);
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253 public void configure(long newWriteLimit, long newReadLimit) {
254 writeLimit = newWriteLimit;
255 readLimit = newReadLimit;
256 if (trafficCounter != null) {
257 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
258 }
259 }
260
261
262
263
264
265
266 public void configure(long newCheckInterval) {
267 checkInterval = newCheckInterval;
268 if (trafficCounter != null) {
269 trafficCounter.configure(checkInterval);
270 }
271 }
272
273
274
275
276 public long getWriteLimit() {
277 return writeLimit;
278 }
279
280
281
282
283
284
285
286
287
288
289 public void setWriteLimit(long writeLimit) {
290 this.writeLimit = writeLimit;
291 if (trafficCounter != null) {
292 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
293 }
294 }
295
296
297
298
299 public long getReadLimit() {
300 return readLimit;
301 }
302
303
304
305
306
307
308
309
310
311
312 public void setReadLimit(long readLimit) {
313 this.readLimit = readLimit;
314 if (trafficCounter != null) {
315 trafficCounter.resetAccounting(TrafficCounter.milliSecondFromNano());
316 }
317 }
318
319
320
321
322 public long getCheckInterval() {
323 return checkInterval;
324 }
325
326
327
328
329 public void setCheckInterval(long checkInterval) {
330 this.checkInterval = checkInterval;
331 if (trafficCounter != null) {
332 trafficCounter.configure(checkInterval);
333 }
334 }
335
336
337
338
339
340
341
342
343
344
345
346
347 public void setMaxTimeWait(long maxTime) {
348 this.maxTime = checkPositive(maxTime, "maxTime");
349 }
350
351
352
353
354 public long getMaxTimeWait() {
355 return maxTime;
356 }
357
358
359
360
361 public long getMaxWriteDelay() {
362 return maxWriteDelay;
363 }
364
365
366
367
368
369
370
371
372
373
374
375 public void setMaxWriteDelay(long maxWriteDelay) {
376 this.maxWriteDelay = checkPositive(maxWriteDelay, "maxWriteDelay");
377 }
378
379
380
381
382 public long getMaxWriteSize() {
383 return maxWriteSize;
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397
398 public void setMaxWriteSize(long maxWriteSize) {
399 this.maxWriteSize = maxWriteSize;
400 }
401
402
403
404
405
406
407
408
409 protected void doAccounting(TrafficCounter counter) {
410
411 }
412
413
414
415
416 static final class ReopenReadTimerTask implements Runnable {
417 final ChannelHandlerContext ctx;
418 ReopenReadTimerTask(ChannelHandlerContext ctx) {
419 this.ctx = ctx;
420 }
421
422 @Override
423 public void run() {
424 Channel channel = ctx.channel();
425 ChannelConfig config = channel.config();
426 if (!config.isAutoRead() && isHandlerActive(ctx)) {
427
428
429 if (logger.isDebugEnabled()) {
430 logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
431 isHandlerActive(ctx));
432 }
433 channel.attr(READ_SUSPENDED).set(false);
434 } else {
435
436 if (logger.isDebugEnabled()) {
437 if (config.isAutoRead() && !isHandlerActive(ctx)) {
438 if (logger.isDebugEnabled()) {
439 logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
440 isHandlerActive(ctx));
441 }
442 } else {
443 if (logger.isDebugEnabled()) {
444 logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
445 + isHandlerActive(ctx));
446 }
447 }
448 }
449 channel.attr(READ_SUSPENDED).set(false);
450 config.setAutoRead(true);
451 channel.read();
452 }
453 if (logger.isDebugEnabled()) {
454 logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
455 + isHandlerActive(ctx));
456 }
457 }
458 }
459
460
461
462
463 void releaseReadSuspended(ChannelHandlerContext ctx) {
464 Channel channel = ctx.channel();
465 channel.attr(READ_SUSPENDED).set(false);
466 channel.config().setAutoRead(true);
467 }
468
469 @Override
470 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
471 long size = calculateSize(msg);
472 long now = TrafficCounter.milliSecondFromNano();
473 if (size > 0) {
474
475 long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
476 wait = checkWaitReadTime(ctx, wait, now);
477 if (wait >= MINIMAL_WAIT) {
478
479
480 Channel channel = ctx.channel();
481 ChannelConfig config = channel.config();
482 if (logger.isDebugEnabled()) {
483 logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
484 + isHandlerActive(ctx));
485 }
486 if (config.isAutoRead() && isHandlerActive(ctx)) {
487 config.setAutoRead(false);
488 channel.attr(READ_SUSPENDED).set(true);
489
490
491 Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
492 Runnable reopenTask = attr.get();
493 if (reopenTask == null) {
494 reopenTask = new ReopenReadTimerTask(ctx);
495 attr.set(reopenTask);
496 }
497 ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
498 if (logger.isDebugEnabled()) {
499 logger.debug("Suspend final status => " + config.isAutoRead() + ':'
500 + isHandlerActive(ctx) + " will reopened at: " + wait);
501 }
502 }
503 }
504 }
505 informReadOperation(ctx, now);
506 ctx.fireChannelRead(msg);
507 }
508
509 @Override
510 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
511 Channel channel = ctx.channel();
512 if (channel.hasAttr(REOPEN_TASK)) {
513
514 channel.attr(REOPEN_TASK).set(null);
515 }
516 super.handlerRemoved(ctx);
517 }
518
519
520
521
522
523
524
525 long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
526
527 return wait;
528 }
529
530
531
532
533
534 void informReadOperation(final ChannelHandlerContext ctx, final long now) {
535
536 }
537
538 protected static boolean isHandlerActive(ChannelHandlerContext ctx) {
539 Boolean suspended = ctx.channel().attr(READ_SUSPENDED).get();
540 return suspended == null || Boolean.FALSE.equals(suspended);
541 }
542
543 @Override
544 public void read(ChannelHandlerContext ctx) {
545 if (isHandlerActive(ctx)) {
546
547 ctx.read();
548 }
549 }
550
551 @Override
552 public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
553 throws Exception {
554 long size = calculateSize(msg);
555 long now = TrafficCounter.milliSecondFromNano();
556 if (size > 0) {
557
558 long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
559 if (wait >= MINIMAL_WAIT) {
560 if (logger.isDebugEnabled()) {
561 logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
562 + isHandlerActive(ctx));
563 }
564 submitWrite(ctx, msg, size, wait, now, promise);
565 return;
566 }
567 }
568
569 submitWrite(ctx, msg, size, 0, now, promise);
570 }
571
572 @Deprecated
573 protected void submitWrite(final ChannelHandlerContext ctx, final Object msg,
574 final long delay, final ChannelPromise promise) {
575 submitWrite(ctx, msg, calculateSize(msg),
576 delay, TrafficCounter.milliSecondFromNano(), promise);
577 }
578
579 abstract void submitWrite(
580 ChannelHandlerContext ctx, Object msg, long size, long delay, long now, ChannelPromise promise);
581
582 @Override
583 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
584 setUserDefinedWritability(ctx, true);
585 super.channelRegistered(ctx);
586 }
587
588 void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
589 ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
590 if (cob != null) {
591 cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
592 }
593 }
594
595
596
597
598
599
600
601 void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
602 if (queueSize > maxWriteSize || delay > maxWriteDelay) {
603 setUserDefinedWritability(ctx, false);
604 }
605 }
606
607
608
609 void releaseWriteSuspended(ChannelHandlerContext ctx) {
610 setUserDefinedWritability(ctx, true);
611 }
612
613
614
615
616
617 public TrafficCounter trafficCounter() {
618 return trafficCounter;
619 }
620
621 @Override
622 public String toString() {
623 StringBuilder builder = new StringBuilder(290)
624 .append("TrafficShaping with Write Limit: ").append(writeLimit)
625 .append(" Read Limit: ").append(readLimit)
626 .append(" CheckInterval: ").append(checkInterval)
627 .append(" maxDelay: ").append(maxWriteDelay)
628 .append(" maxSize: ").append(maxWriteSize)
629 .append(" and Counter: ");
630 if (trafficCounter != null) {
631 builder.append(trafficCounter);
632 } else {
633 builder.append("none");
634 }
635 return builder.toString();
636 }
637
638
639
640
641
642
643
644
645
646 protected long calculateSize(Object msg) {
647 if (msg instanceof ByteBuf) {
648 return ((ByteBuf) msg).readableBytes();
649 }
650 if (msg instanceof ByteBufHolder) {
651 return ((ByteBufHolder) msg).content().readableBytes();
652 }
653 if (msg instanceof FileRegion) {
654 return ((FileRegion) msg).count();
655 }
656 return -1;
657 }
658 }