查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.memcache.binary;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
19  
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.handler.codec.DecoderResult;
24  import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder;
25  import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
26  import io.netty.handler.codec.memcache.DefaultMemcacheContent;
27  import io.netty.handler.codec.memcache.LastMemcacheContent;
28  import io.netty.handler.codec.memcache.MemcacheContent;
29  import io.netty.util.internal.UnstableApi;
30  
31  import java.util.List;
32  
33  /**
34   * Decoder for both {@link BinaryMemcacheRequest} and {@link BinaryMemcacheResponse}.
35   * <p/>
36   * The difference in the protocols (header) is implemented by the subclasses.
37   */
38  @UnstableApi
39  public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMessage>
40      extends AbstractMemcacheObjectDecoder {
41  
42      public static final int DEFAULT_MAX_CHUNK_SIZE = 8192;
43  
44      private final int chunkSize;
45  
46      private M currentMessage;
47      private int alreadyReadChunkSize;
48  
49      private State state = State.READ_HEADER;
50  
51      /**
52       * Create a new {@link AbstractBinaryMemcacheDecoder} with default settings.
53       */
54      protected AbstractBinaryMemcacheDecoder() {
55          this(DEFAULT_MAX_CHUNK_SIZE);
56      }
57  
58      /**
59       * Create a new {@link AbstractBinaryMemcacheDecoder} with custom settings.
60       *
61       * @param chunkSize the maximum chunk size of the payload.
62       */
63      protected AbstractBinaryMemcacheDecoder(int chunkSize) {
64          checkPositiveOrZero(chunkSize, "chunkSize");
65  
66          this.chunkSize = chunkSize;
67      }
68  
69      @Override
70      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
71          switch (state) {
72              case READ_HEADER: try {
73                  if (in.readableBytes() < 24) {
74                      return;
75                  }
76                  resetDecoder();
77  
78                  currentMessage = decodeHeader(in);
79                  state = State.READ_EXTRAS;
80              } catch (Exception e) {
81                  resetDecoder();
82                  out.add(invalidMessage(e));
83                  return;
84              }
85              case READ_EXTRAS: try {
86                  byte extrasLength = currentMessage.extrasLength();
87                  if (extrasLength > 0) {
88                      if (in.readableBytes() < extrasLength) {
89                          return;
90                      }
91  
92                      currentMessage.setExtras(in.readRetainedSlice(extrasLength));
93                  }
94  
95                  state = State.READ_KEY;
96              } catch (Exception e) {
97                  resetDecoder();
98                  out.add(invalidMessage(e));
99                  return;
100             }
101             case READ_KEY: try {
102                 short keyLength = currentMessage.keyLength();
103                 if (keyLength > 0) {
104                     if (in.readableBytes() < keyLength) {
105                         return;
106                     }
107 
108                     currentMessage.setKey(in.readRetainedSlice(keyLength));
109                 }
110                 out.add(currentMessage.retain());
111                 state = State.READ_CONTENT;
112             } catch (Exception e) {
113                 resetDecoder();
114                 out.add(invalidMessage(e));
115                 return;
116             }
117             case READ_CONTENT: try {
118                 int valueLength = currentMessage.totalBodyLength()
119                     - currentMessage.keyLength()
120                     - currentMessage.extrasLength();
121                 int toRead = in.readableBytes();
122                 if (valueLength > 0) {
123                     if (toRead == 0) {
124                         return;
125                     }
126 
127                     if (toRead > chunkSize) {
128                         toRead = chunkSize;
129                     }
130 
131                     int remainingLength = valueLength - alreadyReadChunkSize;
132                     if (toRead > remainingLength) {
133                         toRead = remainingLength;
134                     }
135 
136                     ByteBuf chunkBuffer = in.readRetainedSlice(toRead);
137 
138                     MemcacheContent chunk;
139                     if ((alreadyReadChunkSize += toRead) >= valueLength) {
140                         chunk = new DefaultLastMemcacheContent(chunkBuffer);
141                     } else {
142                         chunk = new DefaultMemcacheContent(chunkBuffer);
143                     }
144 
145                     out.add(chunk);
146                     if (alreadyReadChunkSize < valueLength) {
147                         return;
148                     }
149                 } else {
150                     out.add(LastMemcacheContent.EMPTY_LAST_CONTENT);
151                 }
152 
153                 resetDecoder();
154                 state = State.READ_HEADER;
155                 return;
156             } catch (Exception e) {
157                 resetDecoder();
158                 out.add(invalidChunk(e));
159                 return;
160             }
161             case BAD_MESSAGE:
162                 in.skipBytes(actualReadableBytes());
163                 return;
164             default:
165                 throw new Error("Unknown state reached: " + state);
166         }
167     }
168 
169     /**
170      * Helper method to create a message indicating a invalid decoding result.
171      *
172      * @param cause the cause of the decoding failure.
173      * @return a valid message indicating failure.
174      */
175     private M invalidMessage(Exception cause) {
176         state = State.BAD_MESSAGE;
177         M message = buildInvalidMessage();
178         message.setDecoderResult(DecoderResult.failure(cause));
179         return message;
180     }
181 
182     /**
183      * Helper method to create a content chunk indicating a invalid decoding result.
184      *
185      * @param cause the cause of the decoding failure.
186      * @return a valid content chunk indicating failure.
187      */
188     private MemcacheContent invalidChunk(Exception cause) {
189         state = State.BAD_MESSAGE;
190         MemcacheContent chunk = new DefaultLastMemcacheContent(Unpooled.EMPTY_BUFFER);
191         chunk.setDecoderResult(DecoderResult.failure(cause));
192         return chunk;
193     }
194 
195     /**
196      * When the channel goes inactive, release all frames to prevent data leaks.
197      *
198      * @param ctx handler context
199      * @throws Exception
200      */
201     @Override
202     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
203         super.channelInactive(ctx);
204 
205         resetDecoder();
206     }
207 
208     /**
209      * Prepare for next decoding iteration.
210      */
211     protected void resetDecoder() {
212         if (currentMessage != null) {
213             currentMessage.release();
214             currentMessage = null;
215         }
216         alreadyReadChunkSize = 0;
217     }
218 
219     /**
220      * Decode and return the parsed {@link BinaryMemcacheMessage}.
221      *
222      * @param in the incoming buffer.
223      * @return the decoded header.
224      */
225     protected abstract M decodeHeader(ByteBuf in);
226 
227     /**
228      * Helper method to create a upstream message when the incoming parsing did fail.
229      *
230      * @return a message indicating a decoding failure.
231      */
232     protected abstract M buildInvalidMessage();
233 
234     /**
235      * Contains all states this decoder can possibly be in.
236      * <p/>
237      * Note that most of the states can be optional, the only one required is reading
238      * the header ({@link #READ_HEADER}. All other steps depend on the length fields
239      * in the header and will be executed conditionally.
240      */
241     enum State {
242         /**
243          * Currently reading the header portion.
244          */
245         READ_HEADER,
246 
247         /**
248          * Currently reading the extras portion (optional).
249          */
250         READ_EXTRAS,
251 
252         /**
253          * Currently reading the key portion (optional).
254          */
255         READ_KEY,
256 
257         /**
258          * Currently reading the value chunks (optional).
259          */
260         READ_CONTENT,
261 
262         /**
263          * Something went wrong while decoding the message or chunks.
264          */
265         BAD_MESSAGE
266     }
267 
268 }