回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright (c) 2017 Baidu, Inc. All Rights Reserve.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.baidu.fsg.uid.buffer;
17  
18  import java.util.List;
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.ScheduledExecutorService;
22  import java.util.concurrent.TimeUnit;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  import org.springframework.util.Assert;
28  
29  import com.baidu.fsg.uid.utils.NamingThreadFactory;
30  import com.baidu.fsg.uid.utils.PaddedAtomicLong;
31  
32  /**
33   * Represents an executor for padding {@link RingBuffer}<br>
34   * There are two kinds of executors: one for scheduled padding, the other for padding immediately.
35   * 
36   * @author yutianbao
37   */
38  public class BufferPaddingExecutor {
39      private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class);
40  
41      /** Constants */
42      private static final String WORKER_NAME = "RingBuffer-Padding-Worker";
43      private static final String SCHEDULE_NAME = "RingBuffer-Padding-Schedule";
44      private static final long DEFAULT_SCHEDULE_INTERVAL = 5 * 60L; // 5 minutes
45      
46      /** Whether buffer padding is running */
47      private final AtomicBoolean running;
48  
49      /** We can borrow UIDs from the future, here store the last second we have consumed */
50      private final PaddedAtomicLong lastSecond;
51  
52      /** RingBuffer & BufferUidProvider */
53      private final RingBuffer ringBuffer;
54      private final BufferedUidProvider uidProvider;
55  
56      /** Padding immediately by the thread pool */
57      private final ExecutorService bufferPadExecutors;
58      /** Padding schedule thread */
59      private final ScheduledExecutorService bufferPadSchedule;
60      
61      /** Schedule interval Unit as seconds */
62      private long scheduleInterval = DEFAULT_SCHEDULE_INTERVAL;
63  
64      /**
65       * Constructor with {@link RingBuffer} and {@link BufferedUidProvider}, default use schedule
66       *
67       * @param ringBuffer {@link RingBuffer}
68       * @param uidProvider {@link BufferedUidProvider}
69       */
70      public BufferPaddingExecutor(RingBuffer ringBuffer, BufferedUidProvider uidProvider) {
71          this(ringBuffer, uidProvider, true);
72      }
73  
74      /**
75       * Constructor with {@link RingBuffer}, {@link BufferedUidProvider}, and whether use schedule padding
76       *
77       * @param ringBuffer {@link RingBuffer}
78       * @param uidProvider {@link BufferedUidProvider}
79       * @param usingSchedule
80       */
81      public BufferPaddingExecutor(RingBuffer ringBuffer, BufferedUidProvider uidProvider, boolean usingSchedule) {
82          this.running = new AtomicBoolean(false);
83          this.lastSecond = new PaddedAtomicLong(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
84          this.ringBuffer = ringBuffer;
85          this.uidProvider = uidProvider;
86  
87          // initialize thread pool
88          int cores = Runtime.getRuntime().availableProcessors();
89          bufferPadExecutors = Executors.newFixedThreadPool(cores * 2, new NamingThreadFactory(WORKER_NAME));
90  
91          // initialize schedule thread
92          if (usingSchedule) {
93              bufferPadSchedule = Executors.newSingleThreadScheduledExecutor(new NamingThreadFactory(SCHEDULE_NAME));
94          } else {
95              bufferPadSchedule = null;
96          }
97      }
98  
99      /**
100      * Start executors such as schedule
101      */
102     public void start() {
103         if (bufferPadSchedule != null) {
104             bufferPadSchedule.scheduleWithFixedDelay(() -> paddingBuffer(), scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
105         }
106     }
107 
108     /**
109      * Shutdown executors
110      */
111     public void shutdown() {
112         if (!bufferPadExecutors.isShutdown()) {
113             bufferPadExecutors.shutdownNow();
114         }
115 
116         if (bufferPadSchedule != null && !bufferPadSchedule.isShutdown()) {
117             bufferPadSchedule.shutdownNow();
118         }
119     }
120 
121     /**
122      * Whether is padding
123      *
124      * @return
125      */
126     public boolean isRunning() {
127         return running.get();
128     }
129 
130     /**
131      * Padding buffer in the thread pool
132      */
133     public void asyncPadding() {
134         bufferPadExecutors.submit(this::paddingBuffer);
135     }
136 
137     /**
138      * Padding buffer fill the slots until to catch the cursor
139      */
140     public void paddingBuffer() {
141         LOGGER.info("Ready to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
142 
143         // is still running
144         if (!running.compareAndSet(false, true)) {
145             LOGGER.info("Padding buffer is still running. {}", ringBuffer);
146             return;
147         }
148 
149         // fill the rest slots until to catch the cursor
150         boolean isFullRingBuffer = false;
151         while (!isFullRingBuffer) {
152             List<Long> uidList = uidProvider.provide(lastSecond.incrementAndGet());
153             for (Long uid : uidList) {
154                 isFullRingBuffer = !ringBuffer.put(uid);
155                 if (isFullRingBuffer) {
156                     break;
157                 }
158             }
159         }
160 
161         // not running now
162         running.compareAndSet(true, false);
163         LOGGER.info("End to padding buffer lastSecond:{}. {}", lastSecond.get(), ringBuffer);
164     }
165 
166     /**
167      * Setters
168      */
169     public void setScheduleInterval(long scheduleInterval) {
170         Assert.isTrue(scheduleInterval > 0, "Schedule interval must positive!");
171         this.scheduleInterval = scheduleInterval;
172     }
173     
174 }