1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.redis;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.ByteToMessageDecoder;
21 import io.netty.util.ByteProcessor;
22 import io.netty.util.CharsetUtil;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.List;
26
27
28
29
30
31
32
33
34 @UnstableApi
35 public final class RedisDecoder extends ByteToMessageDecoder {
36
37 private final ToPositiveLongProcessor toPositiveLongProcessor = new ToPositiveLongProcessor();
38
39 private final boolean decodeInlineCommands;
40 private final int maxInlineMessageLength;
41 private final RedisMessagePool messagePool;
42
43
44 private State state = State.DECODE_TYPE;
45 private RedisMessageType type;
46 private int remainingBulkLength;
47
48 private enum State {
49 DECODE_TYPE,
50 DECODE_INLINE,
51 DECODE_LENGTH,
52 DECODE_BULK_STRING_EOL,
53 DECODE_BULK_STRING_CONTENT,
54 }
55
56
57
58
59
60 public RedisDecoder() {
61 this(false);
62 }
63
64
65
66
67
68 public RedisDecoder(boolean decodeInlineCommands) {
69 this(RedisConstants.REDIS_INLINE_MESSAGE_MAX_LENGTH, FixedRedisMessagePool.INSTANCE, decodeInlineCommands);
70 }
71
72
73
74
75
76
77 public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool) {
78 this(maxInlineMessageLength, messagePool, false);
79 }
80
81
82
83
84
85
86
87 public RedisDecoder(int maxInlineMessageLength, RedisMessagePool messagePool, boolean decodeInlineCommands) {
88 if (maxInlineMessageLength <= 0 || maxInlineMessageLength > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
89 throw new RedisCodecException("maxInlineMessageLength: " + maxInlineMessageLength +
90 " (expected: <= " + RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
91 }
92 this.maxInlineMessageLength = maxInlineMessageLength;
93 this.messagePool = messagePool;
94 this.decodeInlineCommands = decodeInlineCommands;
95 }
96
97 @Override
98 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
99 try {
100 for (;;) {
101 switch (state) {
102 case DECODE_TYPE:
103 if (!decodeType(in)) {
104 return;
105 }
106 break;
107 case DECODE_INLINE:
108 if (!decodeInline(in, out)) {
109 return;
110 }
111 break;
112 case DECODE_LENGTH:
113 if (!decodeLength(in, out)) {
114 return;
115 }
116 break;
117 case DECODE_BULK_STRING_EOL:
118 if (!decodeBulkStringEndOfLine(in, out)) {
119 return;
120 }
121 break;
122 case DECODE_BULK_STRING_CONTENT:
123 if (!decodeBulkStringContent(in, out)) {
124 return;
125 }
126 break;
127 default:
128 throw new RedisCodecException("Unknown state: " + state);
129 }
130 }
131 } catch (RedisCodecException e) {
132 resetDecoder();
133 throw e;
134 } catch (Exception e) {
135 resetDecoder();
136 throw new RedisCodecException(e);
137 }
138 }
139
140 private void resetDecoder() {
141 state = State.DECODE_TYPE;
142 remainingBulkLength = 0;
143 }
144
145 private boolean decodeType(ByteBuf in) throws Exception {
146 if (!in.isReadable()) {
147 return false;
148 }
149
150 type = RedisMessageType.readFrom(in, decodeInlineCommands);
151 state = type.isInline() ? State.DECODE_INLINE : State.DECODE_LENGTH;
152 return true;
153 }
154
155 private boolean decodeInline(ByteBuf in, List<Object> out) throws Exception {
156 ByteBuf lineBytes = readLine(in);
157 if (lineBytes == null) {
158 if (in.readableBytes() > maxInlineMessageLength) {
159 throw new RedisCodecException("length: " + in.readableBytes() +
160 " (expected: <= " + maxInlineMessageLength + ")");
161 }
162 return false;
163 }
164 out.add(newInlineRedisMessage(type, lineBytes));
165 resetDecoder();
166 return true;
167 }
168
169 private boolean decodeLength(ByteBuf in, List<Object> out) throws Exception {
170 ByteBuf lineByteBuf = readLine(in);
171 if (lineByteBuf == null) {
172 return false;
173 }
174 final long length = parseRedisNumber(lineByteBuf);
175 if (length < RedisConstants.NULL_VALUE) {
176 throw new RedisCodecException("length: " + length + " (expected: >= " + RedisConstants.NULL_VALUE + ")");
177 }
178 switch (type) {
179 case ARRAY_HEADER:
180 out.add(new ArrayHeaderRedisMessage(length));
181 resetDecoder();
182 return true;
183 case BULK_STRING:
184 if (length > RedisConstants.REDIS_MESSAGE_MAX_LENGTH) {
185 throw new RedisCodecException("length: " + length + " (expected: <= " +
186 RedisConstants.REDIS_MESSAGE_MAX_LENGTH + ")");
187 }
188 remainingBulkLength = (int) length;
189 return decodeBulkString(in, out);
190 default:
191 throw new RedisCodecException("bad type: " + type);
192 }
193 }
194
195 private boolean decodeBulkString(ByteBuf in, List<Object> out) throws Exception {
196 switch (remainingBulkLength) {
197 case RedisConstants.NULL_VALUE:
198 out.add(FullBulkStringRedisMessage.NULL_INSTANCE);
199 resetDecoder();
200 return true;
201 case 0:
202 state = State.DECODE_BULK_STRING_EOL;
203 return decodeBulkStringEndOfLine(in, out);
204 default:
205 out.add(new BulkStringHeaderRedisMessage(remainingBulkLength));
206 state = State.DECODE_BULK_STRING_CONTENT;
207 return decodeBulkStringContent(in, out);
208 }
209 }
210
211
212 private boolean decodeBulkStringEndOfLine(ByteBuf in, List<Object> out) throws Exception {
213 if (in.readableBytes() < RedisConstants.EOL_LENGTH) {
214 return false;
215 }
216 readEndOfLine(in);
217 out.add(FullBulkStringRedisMessage.EMPTY_INSTANCE);
218 resetDecoder();
219 return true;
220 }
221
222
223 private boolean decodeBulkStringContent(ByteBuf in, List<Object> out) throws Exception {
224 final int readableBytes = in.readableBytes();
225 if (readableBytes == 0 || remainingBulkLength == 0 && readableBytes < RedisConstants.EOL_LENGTH) {
226 return false;
227 }
228
229
230 if (readableBytes >= remainingBulkLength + RedisConstants.EOL_LENGTH) {
231 ByteBuf content = in.readSlice(remainingBulkLength);
232 readEndOfLine(in);
233
234 out.add(new DefaultLastBulkStringRedisContent(content.retain()));
235 resetDecoder();
236 return true;
237 }
238
239
240 int toRead = Math.min(remainingBulkLength, readableBytes);
241 remainingBulkLength -= toRead;
242 out.add(new DefaultBulkStringRedisContent(in.readSlice(toRead).retain()));
243 return true;
244 }
245
246 private static void readEndOfLine(final ByteBuf in) {
247 final short delim = in.readShort();
248 if (RedisConstants.EOL_SHORT == delim) {
249 return;
250 }
251 final byte[] bytes = RedisCodecUtil.shortToBytes(delim);
252 throw new RedisCodecException("delimiter: [" + bytes[0] + "," + bytes[1] + "] (expected: \\r\\n)");
253 }
254
255 private RedisMessage newInlineRedisMessage(RedisMessageType messageType, ByteBuf content) {
256 switch (messageType) {
257 case INLINE_COMMAND:
258 return new InlineCommandRedisMessage(content.toString(CharsetUtil.UTF_8));
259 case SIMPLE_STRING: {
260 SimpleStringRedisMessage cached = messagePool.getSimpleString(content);
261 return cached != null ? cached : new SimpleStringRedisMessage(content.toString(CharsetUtil.UTF_8));
262 }
263 case ERROR: {
264 ErrorRedisMessage cached = messagePool.getError(content);
265 return cached != null ? cached : new ErrorRedisMessage(content.toString(CharsetUtil.UTF_8));
266 }
267 case INTEGER: {
268 IntegerRedisMessage cached = messagePool.getInteger(content);
269 return cached != null ? cached : new IntegerRedisMessage(parseRedisNumber(content));
270 }
271 default:
272 throw new RedisCodecException("bad type: " + messageType);
273 }
274 }
275
276 private static ByteBuf readLine(ByteBuf in) {
277 if (!in.isReadable(RedisConstants.EOL_LENGTH)) {
278 return null;
279 }
280 final int lfIndex = in.indexOf(in.readerIndex(), in.writerIndex(), (byte) '\n');
281 if (lfIndex < 0) {
282 return null;
283 }
284 ByteBuf data = in.readSlice(lfIndex - in.readerIndex() - 1);
285 readEndOfLine(in);
286 return data;
287 }
288
289 private long parseRedisNumber(ByteBuf byteBuf) {
290 final int readableBytes = byteBuf.readableBytes();
291 final boolean negative = readableBytes > 0 && byteBuf.getByte(byteBuf.readerIndex()) == '-';
292 final int extraOneByteForNegative = negative ? 1 : 0;
293 if (readableBytes <= extraOneByteForNegative) {
294 throw new RedisCodecException("no number to parse: " + byteBuf.toString(CharsetUtil.US_ASCII));
295 }
296 if (readableBytes > RedisConstants.POSITIVE_LONG_MAX_LENGTH + extraOneByteForNegative) {
297 throw new RedisCodecException("too many characters to be a valid RESP Integer: " +
298 byteBuf.toString(CharsetUtil.US_ASCII));
299 }
300 if (negative) {
301 return -parsePositiveNumber(byteBuf.skipBytes(extraOneByteForNegative));
302 }
303 return parsePositiveNumber(byteBuf);
304 }
305
306 private long parsePositiveNumber(ByteBuf byteBuf) {
307 toPositiveLongProcessor.reset();
308 byteBuf.forEachByte(toPositiveLongProcessor);
309 return toPositiveLongProcessor.content();
310 }
311
312 private static final class ToPositiveLongProcessor implements ByteProcessor {
313 private long result;
314
315 @Override
316 public boolean process(byte value) throws Exception {
317 if (value < '0' || value > '9') {
318 throw new RedisCodecException("bad byte in number: " + value);
319 }
320 result = result * 10 + (value - '0');
321 return true;
322 }
323
324 public long content() {
325 return result;
326 }
327
328 public void reset() {
329 result = 0;
330 }
331 }
332 }