回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright (c) 2017 Baidu, Inc. All Rights Reserve.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package com.baidu.fsg.uid.impl;
17  
18  import java.util.ArrayList;
19  import java.util.List;
20  
21  import org.slf4j.Logger;
22  import org.slf4j.LoggerFactory;
23  import org.springframework.beans.factory.DisposableBean;
24  import org.springframework.util.Assert;
25  
26  import com.baidu.fsg.uid.BitsAllocator;
27  import com.baidu.fsg.uid.UidGenerator;
28  import com.baidu.fsg.uid.buffer.BufferPaddingExecutor;
29  import com.baidu.fsg.uid.buffer.RejectedPutBufferHandler;
30  import com.baidu.fsg.uid.buffer.RejectedTakeBufferHandler;
31  import com.baidu.fsg.uid.buffer.RingBuffer;
32  import com.baidu.fsg.uid.exception.UidGenerateException;
33  
34  /**
35   * Represents a cached implementation of {@link UidGenerator} extends
36   * from {@link DefaultUidGenerator}, based on a lock free {@link RingBuffer}<p>
37   * 
38   * The spring properties you can specified as below:<br>
39   * <li><b>boostPower:</b> RingBuffer size boost for a power of 2, Sample: boostPower is 3, it means the buffer size 
40   *                        will be <code>({@link BitsAllocator#getMaxSequence()} + 1) &lt;&lt;
41   *                        {@link #boostPower}</code>, Default as {@value #DEFAULT_BOOST_POWER}
42   * <li><b>paddingFactor:</b> Represents a percent value of (0 - 100). When the count of rest available UIDs reach the 
43   *                           threshold, it will trigger padding buffer. Default as{@link RingBuffer#DEFAULT_PADDING_PERCENT}
44   *                           Sample: paddingFactor=20, bufferSize=1000 -> threshold=1000 * 20 /100, padding buffer will be triggered when tail-cursor<threshold
45   * <li><b>scheduleInterval:</b> Padding buffer in a schedule, specify padding buffer interval, Unit as second
46   * <li><b>rejectedPutBufferHandler:</b> Policy for rejected put buffer. Default as discard put request, just do logging
47   * <li><b>rejectedTakeBufferHandler:</b> Policy for rejected take buffer. Default as throwing up an exception
48   * 
49   * @author yutianbao
50   */
51  public class CachedUidGenerator extends DefaultUidGenerator implements DisposableBean {
52      private static final Logger LOGGER = LoggerFactory.getLogger(CachedUidGenerator.class);
53      private static final int DEFAULT_BOOST_POWER = 3;
54  
55      /** Spring properties */
56      private int boostPower = DEFAULT_BOOST_POWER;
57      private int paddingFactor = RingBuffer.DEFAULT_PADDING_PERCENT;
58      private Long scheduleInterval;
59      
60      private RejectedPutBufferHandler rejectedPutBufferHandler;
61      private RejectedTakeBufferHandler rejectedTakeBufferHandler;
62  
63      /** RingBuffer */
64      private RingBuffer ringBuffer;
65      private BufferPaddingExecutor bufferPaddingExecutor;
66  
67      @Override
68      public void afterPropertiesSet() throws Exception {
69          // initialize workerId & bitsAllocator
70          super.afterPropertiesSet();
71          
72          // initialize RingBuffer & RingBufferPaddingExecutor
73          this.initRingBuffer();
74          LOGGER.info("Initialized RingBuffer successfully.");
75      }
76      
77      @Override
78      public long getUID() {
79          try {
80              return ringBuffer.take();
81          } catch (Exception e) {
82              LOGGER.error("Generate unique id exception. ", e);
83              throw new UidGenerateException(e);
84          }
85      }
86  
87      @Override
88      public String parseUID(long uid) {
89          return super.parseUID(uid);
90      }
91      
92      @Override
93      public void destroy() throws Exception {
94          bufferPaddingExecutor.shutdown();
95      }
96  
97      /**
98       * Get the UIDs in the same specified second under the max sequence
99       * 
100      * @param currentSecond
101      * @return UID list, size of {@link BitsAllocator#getMaxSequence()} + 1
102      */
103     protected List<Long> nextIdsForOneSecond(long currentSecond) {
104         // Initialize result list size of (max sequence + 1)
105         int listSize = (int) bitsAllocator.getMaxSequence() + 1;
106         List<Long> uidList = new ArrayList<>(listSize);
107 
108         // Allocate the first sequence of the second, the others can be calculated with the offset
109         long firstSeqUid = bitsAllocator.allocate(currentSecond - epochSeconds, workerId, 0L);
110         for (int offset = 0; offset < listSize; offset++) {
111             uidList.add(firstSeqUid + offset);
112         }
113 
114         return uidList;
115     }
116     
117     /**
118      * Initialize RingBuffer & RingBufferPaddingExecutor
119      */
120     private void initRingBuffer() {
121         // initialize RingBuffer
122         int bufferSize = ((int) bitsAllocator.getMaxSequence() + 1) << boostPower;
123         this.ringBuffer = new RingBuffer(bufferSize, paddingFactor);
124         LOGGER.info("Initialized ring buffer size:{}, paddingFactor:{}", bufferSize, paddingFactor);
125 
126         // initialize RingBufferPaddingExecutor
127         boolean usingSchedule = (scheduleInterval != null);
128         this.bufferPaddingExecutor = new BufferPaddingExecutor(ringBuffer, this::nextIdsForOneSecond, usingSchedule);
129         if (usingSchedule) {
130             bufferPaddingExecutor.setScheduleInterval(scheduleInterval);
131         }
132         
133         LOGGER.info("Initialized BufferPaddingExecutor. Using schdule:{}, interval:{}", usingSchedule, scheduleInterval);
134         
135         // set rejected put/take handle policy
136         this.ringBuffer.setBufferPaddingExecutor(bufferPaddingExecutor);
137         if (rejectedPutBufferHandler != null) {
138             this.ringBuffer.setRejectedPutHandler(rejectedPutBufferHandler);
139         }
140         if (rejectedTakeBufferHandler != null) {
141             this.ringBuffer.setRejectedTakeHandler(rejectedTakeBufferHandler);
142         }
143         
144         // fill in all slots of the RingBuffer
145         bufferPaddingExecutor.paddingBuffer();
146         
147         // start buffer padding threads
148         bufferPaddingExecutor.start();
149     }
150 
151     /**
152      * Setters for spring property
153      */
154     public void setBoostPower(int boostPower) {
155         Assert.isTrue(boostPower > 0, "Boost power must be positive!");
156         this.boostPower = boostPower;
157     }
158     
159     public void setRejectedPutBufferHandler(RejectedPutBufferHandler rejectedPutBufferHandler) {
160         Assert.notNull(rejectedPutBufferHandler, "RejectedPutBufferHandler can't be null!");
161         this.rejectedPutBufferHandler = rejectedPutBufferHandler;
162     }
163 
164     public void setRejectedTakeBufferHandler(RejectedTakeBufferHandler rejectedTakeBufferHandler) {
165         Assert.notNull(rejectedTakeBufferHandler, "RejectedTakeBufferHandler can't be null!");
166         this.rejectedTakeBufferHandler = rejectedTakeBufferHandler;
167     }
168 
169     public void setScheduleInterval(long scheduleInterval) {
170         Assert.isTrue(scheduleInterval > 0, "Schedule interval must positive!");
171         this.scheduleInterval = scheduleInterval;
172     }
173 
174 }