1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.redis;
17
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.handler.codec.CodecException;
20 import io.netty.handler.codec.MessageToMessageDecoder;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.internal.UnstableApi;
23
24 import java.util.ArrayDeque;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28
29
30
31
32
33 @UnstableApi
34 public final class RedisArrayAggregator extends MessageToMessageDecoder<RedisMessage> {
35
36 private final Deque<AggregateState> depths = new ArrayDeque<AggregateState>(4);
37
38 @Override
39 protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out) throws Exception {
40 if (msg instanceof ArrayHeaderRedisMessage) {
41 msg = decodeRedisArrayHeader((ArrayHeaderRedisMessage) msg);
42 if (msg == null) {
43 return;
44 }
45 } else {
46 ReferenceCountUtil.retain(msg);
47 }
48
49 while (!depths.isEmpty()) {
50 AggregateState current = depths.peek();
51 current.children.add(msg);
52
53
54 if (current.children.size() == current.length) {
55 msg = new ArrayRedisMessage(current.children);
56 depths.pop();
57 } else {
58
59 return;
60 }
61 }
62
63 out.add(msg);
64 }
65
66 private RedisMessage decodeRedisArrayHeader(ArrayHeaderRedisMessage header) {
67 if (header.isNull()) {
68 return ArrayRedisMessage.NULL_INSTANCE;
69 } else if (header.length() == 0L) {
70 return ArrayRedisMessage.EMPTY_INSTANCE;
71 } else if (header.length() > 0L) {
72
73 if (header.length() > Integer.MAX_VALUE) {
74 throw new CodecException("this codec doesn't support longer length than " + Integer.MAX_VALUE);
75 }
76
77
78 depths.push(new AggregateState((int) header.length()));
79 return null;
80 } else {
81 throw new CodecException("bad length: " + header.length());
82 }
83 }
84
85 private static final class AggregateState {
86 private final int length;
87 private final List<RedisMessage> children;
88 AggregateState(int length) {
89 this.length = length;
90 this.children = new ArrayList<RedisMessage>(length);
91 }
92 }
93 }