回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright (c) 2017 Baidu, Inc. All Rights Reserve.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.baidu.fsg.uid.buffer;
17  
18  import java.util.concurrent.atomic.AtomicLong;
19  
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  import org.springframework.util.Assert;
23  
24  import com.baidu.fsg.uid.utils.PaddedAtomicLong;
25  
26  /**
27   * Represents a ring buffer based on array.<br>
28   * Using array could improve read element performance due to the CUP cache line. To prevent 
29   * the side effect of False Sharing, {@link PaddedAtomicLong} is using on 'tail' and 'cursor'<p>
30   * 
31   * A ring buffer is consisted of:
32   * <li><b>slots:</b> each element of the array is a slot, which is be set with a UID
33   * <li><b>flags:</b> flag array corresponding the same index with the slots, indicates whether can take or put slot
34   * <li><b>tail:</b> a sequence of the max slot position to produce 
35   * <li><b>cursor:</b> a sequence of the min slot position to consume
36   * 
37   * @author yutianbao
38   */
39  public class RingBuffer {
40      private static final Logger LOGGER = LoggerFactory.getLogger(RingBuffer.class);
41  
42      /** Constants */
43      private static final int START_POINT = -1;
44      private static final long CAN_PUT_FLAG = 0L;
45      private static final long CAN_TAKE_FLAG = 1L;
46      public static final int DEFAULT_PADDING_PERCENT = 50;
47  
48      /** The size of RingBuffer's slots, each slot hold a UID */
49      private final int bufferSize;
50      private final long indexMask;
51      private final long[] slots;
52      private final PaddedAtomicLong[] flags;
53  
54      /** Tail: last position sequence to produce */
55      private final AtomicLong tail = new PaddedAtomicLong(START_POINT);
56  
57      /** Cursor: current position sequence to consume */
58      private final AtomicLong cursor = new PaddedAtomicLong(START_POINT);
59  
60      /** Threshold for trigger padding buffer*/
61      private final int paddingThreshold; 
62      
63      /** Reject put/take buffer handle policy */
64      private RejectedPutBufferHandler rejectedPutHandler = this::discardPutBuffer;
65      private RejectedTakeBufferHandler rejectedTakeHandler = this::exceptionRejectedTakeBuffer; 
66      
67      /** Executor of padding buffer */
68      private BufferPaddingExecutor bufferPaddingExecutor;
69  
70      /**
71       * Constructor with buffer size, paddingFactor default as {@value #DEFAULT_PADDING_PERCENT}
72       * 
73       * @param bufferSize must be positive & a power of 2
74       */
75      public RingBuffer(int bufferSize) {
76          this(bufferSize, DEFAULT_PADDING_PERCENT);
77      }
78      
79      /**
80       * Constructor with buffer size & padding factor
81       * 
82       * @param bufferSize must be positive & a power of 2
83       * @param paddingFactor percent in (0 - 100). When the count of rest available UIDs reach the threshold, it will trigger padding buffer<br>
84       *        Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100,  
85       *        padding buffer will be triggered when tail-cursor<threshold
86       */
87      public RingBuffer(int bufferSize, int paddingFactor) {
88          // check buffer size is positive & a power of 2; padding factor in (0, 100)
89          Assert.isTrue(bufferSize > 0L, "RingBuffer size must be positive");
90          Assert.isTrue(Integer.bitCount(bufferSize) == 1, "RingBuffer size must be a power of 2");
91          Assert.isTrue(paddingFactor > 0 && paddingFactor < 100, "RingBuffer size must be positive");
92  
93          this.bufferSize = bufferSize;
94          this.indexMask = bufferSize - 1;
95          this.slots = new long[bufferSize];
96          this.flags = initFlags(bufferSize);
97          
98          this.paddingThreshold = bufferSize * paddingFactor / 100;
99      }
100 
101     /**
102      * Put an UID in the ring & tail moved<br>
103      * We use 'synchronized' to guarantee the UID fill in slot & publish new tail sequence as atomic operations<br>
104      * 
105      * <b>Note that: </b> It is recommended to put UID in a serialize way, cause we once batch generate a series UIDs and put
106      * the one by one into the buffer, so it is unnecessary put in multi-threads
107      *
108      * @param uid
109      * @return false means that the buffer is full, apply {@link RejectedPutBufferHandler}
110      */
111     public synchronized boolean put(long uid) {
112         long currentTail = tail.get();
113         long currentCursor = cursor.get();
114 
115         // tail catches the cursor, means that you can't put any cause of RingBuffer is full
116         long distance = currentTail - (currentCursor == START_POINT ? 0 : currentCursor);
117         if (distance == bufferSize - 1) {
118             rejectedPutHandler.rejectPutBuffer(this, uid);
119             return false;
120         }
121 
122         // 1. pre-check whether the flag is CAN_PUT_FLAG
123         int nextTailIndex = calSlotIndex(currentTail + 1);
124         if (flags[nextTailIndex].get() != CAN_PUT_FLAG) {
125             rejectedPutHandler.rejectPutBuffer(this, uid);
126             return false;
127         }
128 
129         // 2. put UID in the next slot
130         // 3. update next slot' flag to CAN_TAKE_FLAG
131         // 4. publish tail with sequence increase by one
132         slots[nextTailIndex] = uid;
133         flags[nextTailIndex].set(CAN_TAKE_FLAG);
134         tail.incrementAndGet();
135 
136         // The atomicity of operations above, guarantees by 'synchronized'. In another word,
137         // the take operation can't consume the UID we just put, until the tail is published(tail.incrementAndGet())
138         return true;
139     }
140 
141     /**
142      * Take an UID of the ring at the next cursor, this is a lock free operation by using atomic cursor<p>
143      * 
144      * Before getting the UID, we also check whether reach the padding threshold, 
145      * the padding buffer operation will be triggered in another thread<br>
146      * If there is no more available UID to be taken, the specified {@link RejectedTakeBufferHandler} will be applied<br>
147      * 
148      * @return UID
149      * @throws IllegalStateException if the cursor moved back
150      */
151     public long take() {
152         // spin get next available cursor
153         long currentCursor = cursor.get();
154         long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
155 
156         // check for safety consideration, it never occurs
157         Assert.isTrue(nextCursor >= currentCursor, "Curosr can't move back");
158 
159         // trigger padding in an async-mode if reach the threshold
160         long currentTail = tail.get();
161         if (currentTail - nextCursor < paddingThreshold) {
162             LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
163                     nextCursor, currentTail - nextCursor);
164             bufferPaddingExecutor.asyncPadding();
165         }
166 
167         // cursor catch the tail, means that there is no more available UID to take
168         if (nextCursor == currentCursor) {
169             rejectedTakeHandler.rejectTakeBuffer(this);
170         }
171 
172         // 1. check next slot flag is CAN_TAKE_FLAG
173         int nextCursorIndex = calSlotIndex(nextCursor);
174         Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
175 
176         // 2. get UID from next slot
177         // 3. set next slot flag as CAN_PUT_FLAG.
178         long uid = slots[nextCursorIndex];
179         flags[nextCursorIndex].set(CAN_PUT_FLAG);
180 
181         // Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
182         // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
183         return uid;
184     }
185 
186     /**
187      * Calculate slot index with the slot sequence (sequence % bufferSize) 
188      */
189     protected int calSlotIndex(long sequence) {
190         return (int) (sequence & indexMask);
191     }
192 
193     /**
194      * Discard policy for {@link RejectedPutBufferHandler}, we just do logging
195      */
196     protected void discardPutBuffer(RingBuffer ringBuffer, long uid) {
197         LOGGER.warn("Rejected putting buffer for uid:{}. {}", uid, ringBuffer);
198     }
199     
200     /**
201      * Policy for {@link RejectedTakeBufferHandler}, throws {@link RuntimeException} after logging 
202      */
203     protected void exceptionRejectedTakeBuffer(RingBuffer ringBuffer) {
204         LOGGER.warn("Rejected take buffer. {}", ringBuffer);
205         throw new RuntimeException("Rejected take buffer. " + ringBuffer);
206     }
207     
208     /**
209      * Initialize flags as CAN_PUT_FLAG
210      */
211     private PaddedAtomicLong[] initFlags(int bufferSize) {
212         PaddedAtomicLong[] flags = new PaddedAtomicLong[bufferSize];
213         for (int i = 0; i < bufferSize; i++) {
214             flags[i] = new PaddedAtomicLong(CAN_PUT_FLAG);
215         }
216         
217         return flags;
218     }
219 
220     /**
221      * Getters
222      */
223     public long getTail() {
224         return tail.get();
225     }
226 
227     public long getCursor() {
228         return cursor.get();
229     }
230 
231     public int getBufferSize() {
232         return bufferSize;
233     }
234 
235     /**
236      * Setters
237      */
238     public void setBufferPaddingExecutor(BufferPaddingExecutor bufferPaddingExecutor) {
239         this.bufferPaddingExecutor = bufferPaddingExecutor;
240     }
241 
242     public void setRejectedPutHandler(RejectedPutBufferHandler rejectedPutHandler) {
243         this.rejectedPutHandler = rejectedPutHandler;
244     }
245 
246     public void setRejectedTakeHandler(RejectedTakeBufferHandler rejectedTakeHandler) {
247         this.rejectedTakeHandler = rejectedTakeHandler;
248     }
249 
250     @Override
251     public String toString() {
252         StringBuilder builder = new StringBuilder();
253         builder.append("RingBuffer [bufferSize=").append(bufferSize)
254                .append(", tail=").append(tail)
255                .append(", cursor=").append(cursor)
256                .append(", paddingThreshold=").append(paddingThreshold).append("]");
257         
258         return builder.toString();
259     }
260 
261 }