查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import java.util.ArrayList;
19  import java.util.List;
20  
21  import static io.netty.util.internal.ObjectUtil.checkPositive;
22  import static java.lang.Math.max;
23  import static java.lang.Math.min;
24  
25  /**
26   * The {@link RecvByteBufAllocator} that automatically increases and
27   * decreases the predicted buffer size on feed back.
28   * <p>
29   * It gradually increases the expected number of readable bytes if the previous
30   * read fully filled the allocated buffer.  It gradually decreases the expected
31   * number of readable bytes if the read operation was not able to fill a certain
32   * amount of the allocated buffer two times consecutively.  Otherwise, it keeps
33   * returning the same prediction.
34   */
35  public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
36  
37      static final int DEFAULT_MINIMUM = 64;
38      // Use an initial value that is bigger than the common MTU of 1500
39      static final int DEFAULT_INITIAL = 2048;
40      static final int DEFAULT_MAXIMUM = 65536;
41  
42      private static final int INDEX_INCREMENT = 4;
43      private static final int INDEX_DECREMENT = 1;
44  
45      private static final int[] SIZE_TABLE;
46  
47      static {
48          List<Integer> sizeTable = new ArrayList<Integer>();
49          for (int i = 16; i < 512; i += 16) {
50              sizeTable.add(i);
51          }
52  
53          // Suppress a warning since i becomes negative when an integer overflow happens
54          for (int i = 512; i > 0; i <<= 1) {
55              sizeTable.add(i);
56          }
57  
58          SIZE_TABLE = new int[sizeTable.size()];
59          for (int i = 0; i < SIZE_TABLE.length; i ++) {
60              SIZE_TABLE[i] = sizeTable.get(i);
61          }
62      }
63  
64      /**
65       * @deprecated There is state for {@link #maxMessagesPerRead()} which is typically based upon channel type.
66       */
67      @Deprecated
68      public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();
69  
70      private static int getSizeTableIndex(final int size) {
71          for (int low = 0, high = SIZE_TABLE.length - 1;;) {
72              if (high < low) {
73                  return low;
74              }
75              if (high == low) {
76                  return high;
77              }
78  
79              int mid = low + high >>> 1;
80              int a = SIZE_TABLE[mid];
81              int b = SIZE_TABLE[mid + 1];
82              if (size > b) {
83                  low = mid + 1;
84              } else if (size < a) {
85                  high = mid - 1;
86              } else if (size == a) {
87                  return mid;
88              } else {
89                  return mid + 1;
90              }
91          }
92      }
93  
94      private final class HandleImpl extends MaxMessageHandle {
95          private final int minIndex;
96          private final int maxIndex;
97          private final int minCapacity;
98          private final int maxCapacity;
99          private int index;
100         private int nextReceiveBufferSize;
101         private boolean decreaseNow;
102 
103         HandleImpl(int minIndex, int maxIndex, int initialIndex, int minCapacity, int maxCapacity) {
104             this.minIndex = minIndex;
105             this.maxIndex = maxIndex;
106 
107             index = initialIndex;
108             nextReceiveBufferSize = max(SIZE_TABLE[index], minCapacity);
109             this.minCapacity = minCapacity;
110             this.maxCapacity = maxCapacity;
111         }
112 
113         @Override
114         public void lastBytesRead(int bytes) {
115             // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
116             // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
117             // the selector to check for more data. Going back to the selector can add significant latency for large
118             // data transfers.
119             if (bytes == attemptedBytesRead()) {
120                 record(bytes);
121             }
122             super.lastBytesRead(bytes);
123         }
124 
125         @Override
126         public int guess() {
127             return nextReceiveBufferSize;
128         }
129 
130         private void record(int actualReadBytes) {
131             if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
132                 if (decreaseNow) {
133                     index = max(index - INDEX_DECREMENT, minIndex);
134                     nextReceiveBufferSize = max(SIZE_TABLE[index], minCapacity);
135                     decreaseNow = false;
136                 } else {
137                     decreaseNow = true;
138                 }
139             } else if (actualReadBytes >= nextReceiveBufferSize) {
140                 index = min(index + INDEX_INCREMENT, maxIndex);
141                 nextReceiveBufferSize = min(SIZE_TABLE[index], maxCapacity);
142                 decreaseNow = false;
143             }
144         }
145 
146         @Override
147         public void readComplete() {
148             record(totalBytesRead());
149         }
150     }
151 
152     private final int minIndex;
153     private final int maxIndex;
154     private final int initialIndex;
155     private final int minCapacity;
156     private final int maxCapacity;
157 
158     /**
159      * Creates a new predictor with the default parameters.  With the default
160      * parameters, the expected buffer size starts from {@code 1024}, does not
161      * go down below {@code 64}, and does not go up above {@code 65536}.
162      */
163     public AdaptiveRecvByteBufAllocator() {
164         this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM);
165     }
166 
167     /**
168      * Creates a new predictor with the specified parameters.
169      *
170      * @param minimum  the inclusive lower bound of the expected buffer size
171      * @param initial  the initial buffer size when no feed back was received
172      * @param maximum  the inclusive upper bound of the expected buffer size
173      */
174     public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
175         checkPositive(minimum, "minimum");
176         if (initial < minimum) {
177             throw new IllegalArgumentException("initial: " + initial);
178         }
179         if (maximum < initial) {
180             throw new IllegalArgumentException("maximum: " + maximum);
181         }
182 
183         int minIndex = getSizeTableIndex(minimum);
184         if (SIZE_TABLE[minIndex] < minimum) {
185             this.minIndex = minIndex + 1;
186         } else {
187             this.minIndex = minIndex;
188         }
189 
190         int maxIndex = getSizeTableIndex(maximum);
191         if (SIZE_TABLE[maxIndex] > maximum) {
192             this.maxIndex = maxIndex - 1;
193         } else {
194             this.maxIndex = maxIndex;
195         }
196 
197         int initialIndex = getSizeTableIndex(initial);
198         if (SIZE_TABLE[initialIndex] > initial) {
199             this.initialIndex = initialIndex - 1;
200         } else {
201             this.initialIndex = initialIndex;
202         }
203         this.minCapacity = minimum;
204         this.maxCapacity = maximum;
205     }
206 
207     @SuppressWarnings("deprecation")
208     @Override
209     public Handle newHandle() {
210         return new HandleImpl(minIndex, maxIndex, initialIndex, minCapacity, maxCapacity);
211     }
212 
213     @Override
214     public AdaptiveRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
215         super.respectMaybeMoreData(respectMaybeMoreData);
216         return this;
217     }
218 }