1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.memcache.binary;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
19
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.handler.codec.DecoderResult;
24 import io.netty.handler.codec.memcache.AbstractMemcacheObjectDecoder;
25 import io.netty.handler.codec.memcache.DefaultLastMemcacheContent;
26 import io.netty.handler.codec.memcache.DefaultMemcacheContent;
27 import io.netty.handler.codec.memcache.LastMemcacheContent;
28 import io.netty.handler.codec.memcache.MemcacheContent;
29 import io.netty.util.internal.UnstableApi;
30
31 import java.util.List;
32
33
34
35
36
37
38 @UnstableApi
39 public abstract class AbstractBinaryMemcacheDecoder<M extends BinaryMemcacheMessage>
40 extends AbstractMemcacheObjectDecoder {
41
42 public static final int DEFAULT_MAX_CHUNK_SIZE = 8192;
43
44 private final int chunkSize;
45
46 private M currentMessage;
47 private int alreadyReadChunkSize;
48
49 private State state = State.READ_HEADER;
50
51
52
53
54 protected AbstractBinaryMemcacheDecoder() {
55 this(DEFAULT_MAX_CHUNK_SIZE);
56 }
57
58
59
60
61
62
63 protected AbstractBinaryMemcacheDecoder(int chunkSize) {
64 checkPositiveOrZero(chunkSize, "chunkSize");
65
66 this.chunkSize = chunkSize;
67 }
68
69 @Override
70 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
71 switch (state) {
72 case READ_HEADER: try {
73 if (in.readableBytes() < 24) {
74 return;
75 }
76 resetDecoder();
77
78 currentMessage = decodeHeader(in);
79 state = State.READ_EXTRAS;
80 } catch (Exception e) {
81 resetDecoder();
82 out.add(invalidMessage(e));
83 return;
84 }
85 case READ_EXTRAS: try {
86 byte extrasLength = currentMessage.extrasLength();
87 if (extrasLength > 0) {
88 if (in.readableBytes() < extrasLength) {
89 return;
90 }
91
92 currentMessage.setExtras(in.readRetainedSlice(extrasLength));
93 }
94
95 state = State.READ_KEY;
96 } catch (Exception e) {
97 resetDecoder();
98 out.add(invalidMessage(e));
99 return;
100 }
101 case READ_KEY: try {
102 short keyLength = currentMessage.keyLength();
103 if (keyLength > 0) {
104 if (in.readableBytes() < keyLength) {
105 return;
106 }
107
108 currentMessage.setKey(in.readRetainedSlice(keyLength));
109 }
110 out.add(currentMessage.retain());
111 state = State.READ_CONTENT;
112 } catch (Exception e) {
113 resetDecoder();
114 out.add(invalidMessage(e));
115 return;
116 }
117 case READ_CONTENT: try {
118 int valueLength = currentMessage.totalBodyLength()
119 - currentMessage.keyLength()
120 - currentMessage.extrasLength();
121 int toRead = in.readableBytes();
122 if (valueLength > 0) {
123 if (toRead == 0) {
124 return;
125 }
126
127 if (toRead > chunkSize) {
128 toRead = chunkSize;
129 }
130
131 int remainingLength = valueLength - alreadyReadChunkSize;
132 if (toRead > remainingLength) {
133 toRead = remainingLength;
134 }
135
136 ByteBuf chunkBuffer = in.readRetainedSlice(toRead);
137
138 MemcacheContent chunk;
139 if ((alreadyReadChunkSize += toRead) >= valueLength) {
140 chunk = new DefaultLastMemcacheContent(chunkBuffer);
141 } else {
142 chunk = new DefaultMemcacheContent(chunkBuffer);
143 }
144
145 out.add(chunk);
146 if (alreadyReadChunkSize < valueLength) {
147 return;
148 }
149 } else {
150 out.add(LastMemcacheContent.EMPTY_LAST_CONTENT);
151 }
152
153 resetDecoder();
154 state = State.READ_HEADER;
155 return;
156 } catch (Exception e) {
157 resetDecoder();
158 out.add(invalidChunk(e));
159 return;
160 }
161 case BAD_MESSAGE:
162 in.skipBytes(actualReadableBytes());
163 return;
164 default:
165 throw new Error("Unknown state reached: " + state);
166 }
167 }
168
169
170
171
172
173
174
175 private M invalidMessage(Exception cause) {
176 state = State.BAD_MESSAGE;
177 M message = buildInvalidMessage();
178 message.setDecoderResult(DecoderResult.failure(cause));
179 return message;
180 }
181
182
183
184
185
186
187
188 private MemcacheContent invalidChunk(Exception cause) {
189 state = State.BAD_MESSAGE;
190 MemcacheContent chunk = new DefaultLastMemcacheContent(Unpooled.EMPTY_BUFFER);
191 chunk.setDecoderResult(DecoderResult.failure(cause));
192 return chunk;
193 }
194
195
196
197
198
199
200
201 @Override
202 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
203 super.channelInactive(ctx);
204
205 resetDecoder();
206 }
207
208
209
210
211 protected void resetDecoder() {
212 if (currentMessage != null) {
213 currentMessage.release();
214 currentMessage = null;
215 }
216 alreadyReadChunkSize = 0;
217 }
218
219
220
221
222
223
224
225 protected abstract M decodeHeader(ByteBuf in);
226
227
228
229
230
231
232 protected abstract M buildInvalidMessage();
233
234
235
236
237
238
239
240
241 enum State {
242
243
244
245 READ_HEADER,
246
247
248
249
250 READ_EXTRAS,
251
252
253
254
255 READ_KEY,
256
257
258
259
260 READ_CONTENT,
261
262
263
264
265 BAD_MESSAGE
266 }
267
268 }