查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.util.UncheckedBooleanSupplier;
23  
24  import java.util.AbstractMap;
25  import java.util.Map.Entry;
26  
27  /**
28   * The {@link RecvByteBufAllocator} that yields a buffer size prediction based upon decrementing the value from
29   * the max bytes per read.
30   */
31  public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufAllocator {
32      private volatile int maxBytesPerRead;
33      private volatile int maxBytesPerIndividualRead;
34  
35      private final class HandleImpl implements ExtendedHandle {
36          private int individualReadMax;
37          private int bytesToRead;
38          private int lastBytesRead;
39          private int attemptBytesRead;
40          private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
41              @Override
42              public boolean get() {
43                  return attemptBytesRead == lastBytesRead;
44              }
45          };
46  
47          @Override
48          public ByteBuf allocate(ByteBufAllocator alloc) {
49              return alloc.ioBuffer(guess());
50          }
51  
52          @Override
53          public int guess() {
54              return Math.min(individualReadMax, bytesToRead);
55          }
56  
57          @Override
58          public void reset(ChannelConfig config) {
59              bytesToRead = maxBytesPerRead();
60              individualReadMax = maxBytesPerIndividualRead();
61          }
62  
63          @Override
64          public void incMessagesRead(int amt) {
65          }
66  
67          @Override
68          public void lastBytesRead(int bytes) {
69              lastBytesRead = bytes;
70              // Ignore if bytes is negative, the interface contract states it will be detected externally after call.
71              // The value may be "invalid" after this point, but it doesn't matter because reading will be stopped.
72              bytesToRead -= bytes;
73          }
74  
75          @Override
76          public int lastBytesRead() {
77              return lastBytesRead;
78          }
79  
80          @Override
81          public boolean continueReading() {
82              return continueReading(defaultMaybeMoreSupplier);
83          }
84  
85          @Override
86          public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
87              // Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided.
88              return bytesToRead > 0 && maybeMoreDataSupplier.get();
89          }
90  
91          @Override
92          public void readComplete() {
93          }
94  
95          @Override
96          public void attemptedBytesRead(int bytes) {
97              attemptBytesRead = bytes;
98          }
99  
100         @Override
101         public int attemptedBytesRead() {
102             return attemptBytesRead;
103         }
104     }
105 
106     public DefaultMaxBytesRecvByteBufAllocator() {
107         this(64 * 1024, 64 * 1024);
108     }
109 
110     public DefaultMaxBytesRecvByteBufAllocator(int maxBytesPerRead, int maxBytesPerIndividualRead) {
111         checkMaxBytesPerReadPair(maxBytesPerRead, maxBytesPerIndividualRead);
112         this.maxBytesPerRead = maxBytesPerRead;
113         this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
114     }
115 
116     @SuppressWarnings("deprecation")
117     @Override
118     public Handle newHandle() {
119         return new HandleImpl();
120     }
121 
122     @Override
123     public int maxBytesPerRead() {
124         return maxBytesPerRead;
125     }
126 
127     @Override
128     public DefaultMaxBytesRecvByteBufAllocator maxBytesPerRead(int maxBytesPerRead) {
129         checkPositive(maxBytesPerRead, "maxBytesPerRead");
130         // There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b).
131         // Write operations must be synchronized, but independent read operations can just be volatile.
132         synchronized (this) {
133             final int maxBytesPerIndividualRead = maxBytesPerIndividualRead();
134             if (maxBytesPerRead < maxBytesPerIndividualRead) {
135                 throw new IllegalArgumentException(
136                         "maxBytesPerRead cannot be less than " +
137                                 "maxBytesPerIndividualRead (" + maxBytesPerIndividualRead + "): " + maxBytesPerRead);
138             }
139 
140             this.maxBytesPerRead = maxBytesPerRead;
141         }
142         return this;
143     }
144 
145     @Override
146     public int maxBytesPerIndividualRead() {
147         return maxBytesPerIndividualRead;
148     }
149 
150     @Override
151     public DefaultMaxBytesRecvByteBufAllocator maxBytesPerIndividualRead(int maxBytesPerIndividualRead) {
152         checkPositive(maxBytesPerIndividualRead, "maxBytesPerIndividualRead");
153         // There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b).
154         // Write operations must be synchronized, but independent read operations can just be volatile.
155         synchronized (this) {
156             final int maxBytesPerRead = maxBytesPerRead();
157             if (maxBytesPerIndividualRead > maxBytesPerRead) {
158                 throw new IllegalArgumentException(
159                         "maxBytesPerIndividualRead cannot be greater than " +
160                                 "maxBytesPerRead (" + maxBytesPerRead + "): " + maxBytesPerIndividualRead);
161             }
162 
163             this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
164         }
165         return this;
166     }
167 
168     @Override
169     public synchronized Entry<Integer, Integer> maxBytesPerReadPair() {
170         return new AbstractMap.SimpleEntry<Integer, Integer>(maxBytesPerRead, maxBytesPerIndividualRead);
171     }
172 
173     private static void checkMaxBytesPerReadPair(int maxBytesPerRead, int maxBytesPerIndividualRead) {
174         checkPositive(maxBytesPerRead, "maxBytesPerRead");
175         checkPositive(maxBytesPerIndividualRead, "maxBytesPerIndividualRead");
176         if (maxBytesPerRead < maxBytesPerIndividualRead) {
177             throw new IllegalArgumentException(
178                     "maxBytesPerRead cannot be less than " +
179                             "maxBytesPerIndividualRead (" + maxBytesPerIndividualRead + "): " + maxBytesPerRead);
180         }
181     }
182 
183     @Override
184     public DefaultMaxBytesRecvByteBufAllocator maxBytesPerReadPair(int maxBytesPerRead,
185             int maxBytesPerIndividualRead) {
186         checkMaxBytesPerReadPair(maxBytesPerRead, maxBytesPerIndividualRead);
187         // There is a dependency between this.maxBytesPerRead and this.maxBytesPerIndividualRead (a < b).
188         // Write operations must be synchronized, but independent read operations can just be volatile.
189         synchronized (this) {
190             this.maxBytesPerRead = maxBytesPerRead;
191             this.maxBytesPerIndividualRead = maxBytesPerIndividualRead;
192         }
193         return this;
194     }
195 }