1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.baidu.fsg.uid.buffer;
17
18 import java.util.List;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.ScheduledExecutorService;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicBoolean;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import org.springframework.util.Assert;
28
29 import com.baidu.fsg.uid.utils.NamingThreadFactory;
30 import com.baidu.fsg.uid.utils.PaddedAtomicLong;
31
32
33
34
35
36
37
38 public class BufferPaddingExecutor {
39 private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class);
40
41
42 private static final String WORKER_NAME = "RingBuffer-Padding-Worker";
43 private static final String SCHEDULE_NAME = "RingBuffer-Padding-Schedule";
44 private static final long DEFAULT_SCHEDULE_INTERVAL = 5 * 60L;
45
46
47 private final AtomicBoolean running;
48
49
50 private final PaddedAtomicLong lastSecond;
51
52
53 private final RingBuffer ringBuffer;
54 private final BufferedUidProvider uidProvider;
55
56
57 private final ExecutorService bufferPadExecutors;
58
59 private final ScheduledExecutorService bufferPadSchedule;
60
61
62 private long scheduleInterval = DEFAULT_SCHEDULE_INTERVAL;
63
64
65
66
67
68
69
70 public BufferPaddingExecutor(RingBuffer ringBuffer, BufferedUidProvider uidProvider) {
71 this(ringBuffer, uidProvider, true);
72 }
73
74
75
76
77
78
79
80
81 public BufferPaddingExecutor(RingBuffer ringBuffer, BufferedUidProvider uidProvider, boolean usingSchedule) {
82 this.running = new AtomicBoolean(false);
83 this.lastSecond = new PaddedAtomicLong(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
84 this.ringBuffer = ringBuffer;
85 this.uidProvider = uidProvider;
86
87
88 int cores = Runtime.getRuntime().availableProcessors();
89 bufferPadExecutors = Executors.newFixedThreadPool(cores * 2, new NamingThreadFactory(WORKER_NAME));
90
91
92 if (usingSchedule) {
93 bufferPadSchedule = Executors.newSingleThreadScheduledExecutor(new NamingThreadFactory(SCHEDULE_NAME));
94 } else {
95 bufferPadSchedule = null;
96 }
97 }
98
99
100
101
102 public void start() {
103 if (bufferPadSchedule != null) {
104 bufferPadSchedule.scheduleWithFixedDelay(() -> paddingBuffer(), scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
105 }
106 }
107
108
109
110
111 public void shutdown() {
112 if (!bufferPadExecutors.isShutdown()) {
113 bufferPadExecutors.shutdownNow();
114 }
115
116 if (bufferPadSchedule != null && !bufferPadSchedule.isShutdown()) {
117 bufferPadSchedule.shutdownNow();
118 }
119 }
120
121
122
123
124
125
126 public boolean isRunning() {
127 return running.get();
128 }
129
130
131
132
133 public void asyncPadding() {
134 bufferPadExecutors.submit(this::paddingBuffer);
135 }
136
137
138
139
140 public void paddingBuffer() {
141 LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
142
143
144 if (!running.compareAndSet(false, true)) {
145 LOGGER.info("Padding buffer is still running. {}", ringBuffer);
146 return;
147 }
148
149
150 boolean isFullRingBuffer = false;
151 while (!isFullRingBuffer) {
152 List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
153 for (Long uid : uidList) {
154 isFullRingBuffer = !ringBuffer.put(uid);
155 if (isFullRingBuffer) {
156 break;
157 }
158 }
159 }
160
161
162 running.compareAndSet(true, false);
163 LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
164 }
165
166
167
168
169 public void setScheduleInterval(long scheduleInterval) {
170 Assert.isTrue(scheduleInterval > 0, "Schedule interval must positive!");
171 this.scheduleInterval = scheduleInterval;
172 }
173
174 }