1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.traffic;
17
18 import static io.netty.util.internal.ObjectUtil.checkNotNull;
19 import static io.netty.util.internal.ObjectUtil.checkNotNullWithIAE;
20 import io.netty.util.internal.logging.InternalLogger;
21 import io.netty.util.internal.logging.InternalLoggerFactory;
22
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.ScheduledFuture;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicLong;
27
28
29
30
31
32
33
34
35
36
37
38 public class TrafficCounter {
39
40 private static final InternalLogger logger = InternalLoggerFactory.getInstance(TrafficCounter.class);
41
42
43
44
45 public static long milliSecondFromNano() {
46 return System.nanoTime() / 1000000;
47 }
48
49
50
51
52 private final AtomicLong currentWrittenBytes = new AtomicLong();
53
54
55
56
57 private final AtomicLong currentReadBytes = new AtomicLong();
58
59
60
61
62 private long writingTime;
63
64
65
66
67 private long readingTime;
68
69
70
71
72 private final AtomicLong cumulativeWrittenBytes = new AtomicLong();
73
74
75
76
77 private final AtomicLong cumulativeReadBytes = new AtomicLong();
78
79
80
81
82 private long lastCumulativeTime;
83
84
85
86
87 private long lastWriteThroughput;
88
89
90
91
92 private long lastReadThroughput;
93
94
95
96
97 final AtomicLong lastTime = new AtomicLong();
98
99
100
101
102 private volatile long lastWrittenBytes;
103
104
105
106
107 private volatile long lastReadBytes;
108
109
110
111
112 private volatile long lastWritingTime;
113
114
115
116
117 private volatile long lastReadingTime;
118
119
120
121
122 private final AtomicLong realWrittenBytes = new AtomicLong();
123
124
125
126
127 private long realWriteThroughput;
128
129
130
131
132 final AtomicLong checkInterval = new AtomicLong(
133 AbstractTrafficShapingHandler.DEFAULT_CHECK_INTERVAL);
134
135
136
137
138
139
140 final String name;
141
142
143
144
145 final AbstractTrafficShapingHandler trafficShapingHandler;
146
147
148
149
150 final ScheduledExecutorService executor;
151
152
153
154 Runnable monitor;
155
156
157
158 volatile ScheduledFuture<?> scheduledFuture;
159
160
161
162
163 volatile boolean monitorActive;
164
165
166
167
168
169 private final class TrafficMonitoringTask implements Runnable {
170 @Override
171 public void run() {
172 if (!monitorActive) {
173 return;
174 }
175 resetAccounting(milliSecondFromNano());
176 if (trafficShapingHandler != null) {
177 trafficShapingHandler.doAccounting(TrafficCounter.this);
178 }
179 }
180 }
181
182
183
184
185 public synchronized void start() {
186 if (monitorActive) {
187 return;
188 }
189 lastTime.set(milliSecondFromNano());
190 long localCheckInterval = checkInterval.get();
191
192 if (localCheckInterval > 0 && executor != null) {
193 monitorActive = true;
194 monitor = new TrafficMonitoringTask();
195 scheduledFuture =
196 executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
197 }
198 }
199
200
201
202
203 public synchronized void stop() {
204 if (!monitorActive) {
205 return;
206 }
207 monitorActive = false;
208 resetAccounting(milliSecondFromNano());
209 if (trafficShapingHandler != null) {
210 trafficShapingHandler.doAccounting(this);
211 }
212 if (scheduledFuture != null) {
213 scheduledFuture.cancel(true);
214 }
215 }
216
217
218
219
220
221
222 synchronized void resetAccounting(long newLastTime) {
223 long interval = newLastTime - lastTime.getAndSet(newLastTime);
224 if (interval == 0) {
225
226 return;
227 }
228 if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
229 logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
230 }
231 lastReadBytes = currentReadBytes.getAndSet(0);
232 lastWrittenBytes = currentWrittenBytes.getAndSet(0);
233 lastReadThroughput = lastReadBytes * 1000 / interval;
234
235 lastWriteThroughput = lastWrittenBytes * 1000 / interval;
236
237 realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
238 lastWritingTime = Math.max(lastWritingTime, writingTime);
239 lastReadingTime = Math.max(lastReadingTime, readingTime);
240 }
241
242
243
244
245
246
247
248
249
250
251
252
253
254 public TrafficCounter(ScheduledExecutorService executor, String name, long checkInterval) {
255
256 this.name = checkNotNull(name, "name");
257 trafficShapingHandler = null;
258 this.executor = executor;
259
260 init(checkInterval);
261 }
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 public TrafficCounter(
278 AbstractTrafficShapingHandler trafficShapingHandler, ScheduledExecutorService executor,
279 String name, long checkInterval) {
280 this.name = checkNotNull(name, "name");
281 this.trafficShapingHandler = checkNotNullWithIAE(trafficShapingHandler, "trafficShapingHandler");
282 this.executor = executor;
283
284 init(checkInterval);
285 }
286
287 private void init(long checkInterval) {
288
289 lastCumulativeTime = System.currentTimeMillis();
290 writingTime = milliSecondFromNano();
291 readingTime = writingTime;
292 lastWritingTime = writingTime;
293 lastReadingTime = writingTime;
294 configure(checkInterval);
295 }
296
297
298
299
300
301
302 public void configure(long newCheckInterval) {
303 long newInterval = newCheckInterval / 10 * 10;
304 if (checkInterval.getAndSet(newInterval) != newInterval) {
305 if (newInterval <= 0) {
306 stop();
307
308 lastTime.set(milliSecondFromNano());
309 } else {
310
311 stop();
312 start();
313 }
314 }
315 }
316
317
318
319
320
321
322
323 void bytesRecvFlowControl(long recv) {
324 currentReadBytes.addAndGet(recv);
325 cumulativeReadBytes.addAndGet(recv);
326 }
327
328
329
330
331
332
333
334 void bytesWriteFlowControl(long write) {
335 currentWrittenBytes.addAndGet(write);
336 cumulativeWrittenBytes.addAndGet(write);
337 }
338
339
340
341
342
343
344
345 void bytesRealWriteFlowControl(long write) {
346 realWrittenBytes.addAndGet(write);
347 }
348
349
350
351
352
353 public long checkInterval() {
354 return checkInterval.get();
355 }
356
357
358
359
360 public long lastReadThroughput() {
361 return lastReadThroughput;
362 }
363
364
365
366
367 public long lastWriteThroughput() {
368 return lastWriteThroughput;
369 }
370
371
372
373
374 public long lastReadBytes() {
375 return lastReadBytes;
376 }
377
378
379
380
381 public long lastWrittenBytes() {
382 return lastWrittenBytes;
383 }
384
385
386
387
388 public long currentReadBytes() {
389 return currentReadBytes.get();
390 }
391
392
393
394
395 public long currentWrittenBytes() {
396 return currentWrittenBytes.get();
397 }
398
399
400
401
402 public long lastTime() {
403 return lastTime.get();
404 }
405
406
407
408
409 public long cumulativeWrittenBytes() {
410 return cumulativeWrittenBytes.get();
411 }
412
413
414
415
416 public long cumulativeReadBytes() {
417 return cumulativeReadBytes.get();
418 }
419
420
421
422
423
424 public long lastCumulativeTime() {
425 return lastCumulativeTime;
426 }
427
428
429
430
431 public AtomicLong getRealWrittenBytes() {
432 return realWrittenBytes;
433 }
434
435
436
437
438 public long getRealWriteThroughput() {
439 return realWriteThroughput;
440 }
441
442
443
444
445
446 public void resetCumulativeTime() {
447 lastCumulativeTime = System.currentTimeMillis();
448 cumulativeReadBytes.set(0);
449 cumulativeWrittenBytes.set(0);
450 }
451
452
453
454
455 public String name() {
456 return name;
457 }
458
459
460
461
462
463
464
465
466
467
468
469
470
471 @Deprecated
472 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime) {
473 return readTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
474 }
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489 public long readTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
490 bytesRecvFlowControl(size);
491 if (size == 0 || limitTraffic == 0) {
492 return 0;
493 }
494 final long lastTimeCheck = lastTime.get();
495 long sum = currentReadBytes.get();
496 long localReadingTime = readingTime;
497 long lastRB = lastReadBytes;
498 final long interval = now - lastTimeCheck;
499 long pastDelay = Math.max(lastReadingTime - lastTimeCheck, 0);
500 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
501
502 long time = sum * 1000 / limitTraffic - interval + pastDelay;
503 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
504 if (logger.isDebugEnabled()) {
505 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
506 }
507 if (time > maxTime && now + time - localReadingTime > maxTime) {
508 time = maxTime;
509 }
510 readingTime = Math.max(localReadingTime, now + time);
511 return time;
512 }
513 readingTime = Math.max(localReadingTime, now);
514 return 0;
515 }
516
517 long lastsum = sum + lastRB;
518 long lastinterval = interval + checkInterval.get();
519 long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
520 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
521 if (logger.isDebugEnabled()) {
522 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
523 }
524 if (time > maxTime && now + time - localReadingTime > maxTime) {
525 time = maxTime;
526 }
527 readingTime = Math.max(localReadingTime, now + time);
528 return time;
529 }
530 readingTime = Math.max(localReadingTime, now);
531 return 0;
532 }
533
534
535
536
537
538
539
540
541
542
543
544
545
546 @Deprecated
547 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime) {
548 return writeTimeToWait(size, limitTraffic, maxTime, milliSecondFromNano());
549 }
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564 public long writeTimeToWait(final long size, final long limitTraffic, final long maxTime, final long now) {
565 bytesWriteFlowControl(size);
566 if (size == 0 || limitTraffic == 0) {
567 return 0;
568 }
569 final long lastTimeCheck = lastTime.get();
570 long sum = currentWrittenBytes.get();
571 long lastWB = lastWrittenBytes;
572 long localWritingTime = writingTime;
573 long pastDelay = Math.max(lastWritingTime - lastTimeCheck, 0);
574 final long interval = now - lastTimeCheck;
575 if (interval > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
576
577 long time = sum * 1000 / limitTraffic - interval + pastDelay;
578 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
579 if (logger.isDebugEnabled()) {
580 logger.debug("Time: " + time + ':' + sum + ':' + interval + ':' + pastDelay);
581 }
582 if (time > maxTime && now + time - localWritingTime > maxTime) {
583 time = maxTime;
584 }
585 writingTime = Math.max(localWritingTime, now + time);
586 return time;
587 }
588 writingTime = Math.max(localWritingTime, now);
589 return 0;
590 }
591
592 long lastsum = sum + lastWB;
593 long lastinterval = interval + checkInterval.get();
594 long time = lastsum * 1000 / limitTraffic - lastinterval + pastDelay;
595 if (time > AbstractTrafficShapingHandler.MINIMAL_WAIT) {
596 if (logger.isDebugEnabled()) {
597 logger.debug("Time: " + time + ':' + lastsum + ':' + lastinterval + ':' + pastDelay);
598 }
599 if (time > maxTime && now + time - localWritingTime > maxTime) {
600 time = maxTime;
601 }
602 writingTime = Math.max(localWritingTime, now + time);
603 return time;
604 }
605 writingTime = Math.max(localWritingTime, now);
606 return 0;
607 }
608
609 @Override
610 public String toString() {
611 return new StringBuilder(165).append("Monitor ").append(name)
612 .append(" Current Speed Read: ").append(lastReadThroughput >> 10).append(" KB/s, ")
613 .append("Asked Write: ").append(lastWriteThroughput >> 10).append(" KB/s, ")
614 .append("Real Write: ").append(realWriteThroughput >> 10).append(" KB/s, ")
615 .append("Current Read: ").append(currentReadBytes.get() >> 10).append(" KB, ")
616 .append("Current asked Write: ").append(currentWrittenBytes.get() >> 10).append(" KB, ")
617 .append("Current real Write: ").append(realWrittenBytes.get() >> 10).append(" KB").toString();
618 }
619 }