查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2015 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.util.UncheckedBooleanSupplier;
23  
24  /**
25   * Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()}
26   * and also prevents overflow.
27   */
28  public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessagesRecvByteBufAllocator {
29      private final boolean ignoreBytesRead;
30      private volatile int maxMessagesPerRead;
31      private volatile boolean respectMaybeMoreData = true;
32  
33      public DefaultMaxMessagesRecvByteBufAllocator() {
34          this(1);
35      }
36  
37      public DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead) {
38          this(maxMessagesPerRead, false);
39      }
40  
41      DefaultMaxMessagesRecvByteBufAllocator(int maxMessagesPerRead, boolean ignoreBytesRead) {
42          this.ignoreBytesRead = ignoreBytesRead;
43          maxMessagesPerRead(maxMessagesPerRead);
44      }
45  
46      @Override
47      public int maxMessagesPerRead() {
48          return maxMessagesPerRead;
49      }
50  
51      @Override
52      public MaxMessagesRecvByteBufAllocator maxMessagesPerRead(int maxMessagesPerRead) {
53          checkPositive(maxMessagesPerRead, "maxMessagesPerRead");
54          this.maxMessagesPerRead = maxMessagesPerRead;
55          return this;
56      }
57  
58      /**
59       * Determine if future instances of {@link #newHandle()} will stop reading if we think there is no more data.
60       * @param respectMaybeMoreData
61       * <ul>
62       *     <li>{@code true} to stop reading if we think there is no more data. This may save a system call to read from
63       *          the socket, but if data has arrived in a racy fashion we may give up our {@link #maxMessagesPerRead()}
64       *          quantum and have to wait for the selector to notify us of more data.</li>
65       *     <li>{@code false} to keep reading (up to {@link #maxMessagesPerRead()}) or until there is no data when we
66       *          attempt to read.</li>
67       * </ul>
68       * @return {@code this}.
69       */
70      public DefaultMaxMessagesRecvByteBufAllocator respectMaybeMoreData(boolean respectMaybeMoreData) {
71          this.respectMaybeMoreData = respectMaybeMoreData;
72          return this;
73      }
74  
75      /**
76       * Get if future instances of {@link #newHandle()} will stop reading if we think there is no more data.
77       * @return
78       * <ul>
79       *     <li>{@code true} to stop reading if we think there is no more data. This may save a system call to read from
80       *          the socket, but if data has arrived in a racy fashion we may give up our {@link #maxMessagesPerRead()}
81       *          quantum and have to wait for the selector to notify us of more data.</li>
82       *     <li>{@code false} to keep reading (up to {@link #maxMessagesPerRead()}) or until there is no data when we
83       *          attempt to read.</li>
84       * </ul>
85       */
86      public final boolean respectMaybeMoreData() {
87          return respectMaybeMoreData;
88      }
89  
90      /**
91       * Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}.
92       */
93      public abstract class MaxMessageHandle implements ExtendedHandle {
94          private ChannelConfig config;
95          private int maxMessagePerRead;
96          private int totalMessages;
97          private int totalBytesRead;
98          private int attemptedBytesRead;
99          private int lastBytesRead;
100         private final boolean respectMaybeMoreData = DefaultMaxMessagesRecvByteBufAllocator.this.respectMaybeMoreData;
101         private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() {
102             @Override
103             public boolean get() {
104                 return attemptedBytesRead == lastBytesRead;
105             }
106         };
107 
108         /**
109          * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used.
110          */
111         @Override
112         public void reset(ChannelConfig config) {
113             this.config = config;
114             maxMessagePerRead = maxMessagesPerRead();
115             totalMessages = totalBytesRead = 0;
116         }
117 
118         @Override
119         public ByteBuf allocate(ByteBufAllocator alloc) {
120             return alloc.ioBuffer(guess());
121         }
122 
123         @Override
124         public final void incMessagesRead(int amt) {
125             totalMessages += amt;
126         }
127 
128         @Override
129         public void lastBytesRead(int bytes) {
130             lastBytesRead = bytes;
131             if (bytes > 0) {
132                 totalBytesRead += bytes;
133             }
134         }
135 
136         @Override
137         public final int lastBytesRead() {
138             return lastBytesRead;
139         }
140 
141         @Override
142         public boolean continueReading() {
143             return continueReading(defaultMaybeMoreSupplier);
144         }
145 
146         @Override
147         public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
148             return config.isAutoRead() &&
149                    (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
150                    totalMessages < maxMessagePerRead && (ignoreBytesRead || totalBytesRead > 0);
151         }
152 
153         @Override
154         public void readComplete() {
155         }
156 
157         @Override
158         public int attemptedBytesRead() {
159             return attemptedBytesRead;
160         }
161 
162         @Override
163         public void attemptedBytesRead(int bytes) {
164             attemptedBytesRead = bytes;
165         }
166 
167         protected final int totalBytesRead() {
168             return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead;
169         }
170     }
171 }