1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.memcache.binary;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.CombinedChannelDuplexHandler;
21 import io.netty.handler.codec.PrematureChannelClosureException;
22 import io.netty.handler.codec.memcache.LastMemcacheContent;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicLong;
27
28
29
30
31
32
33
34
35
36
37
38
39 @UnstableApi
40 public final class BinaryMemcacheClientCodec extends
41 CombinedChannelDuplexHandler<BinaryMemcacheResponseDecoder, BinaryMemcacheRequestEncoder> {
42
43 private final boolean failOnMissingResponse;
44 private final AtomicLong requestResponseCounter = new AtomicLong();
45
46
47
48
49 public BinaryMemcacheClientCodec() {
50 this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE);
51 }
52
53
54
55
56
57
58 public BinaryMemcacheClientCodec(int decodeChunkSize) {
59 this(decodeChunkSize, false);
60 }
61
62
63
64
65
66
67
68 public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
69 this.failOnMissingResponse = failOnMissingResponse;
70 init(new Decoder(decodeChunkSize), new Encoder());
71 }
72
73 private final class Encoder extends BinaryMemcacheRequestEncoder {
74
75 @Override
76 protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
77 super.encode(ctx, msg, out);
78
79 if (failOnMissingResponse && msg instanceof LastMemcacheContent) {
80 requestResponseCounter.incrementAndGet();
81 }
82 }
83 }
84
85 private final class Decoder extends BinaryMemcacheResponseDecoder {
86
87 Decoder(int chunkSize) {
88 super(chunkSize);
89 }
90
91 @Override
92 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
93 int oldSize = out.size();
94 super.decode(ctx, in, out);
95
96 if (failOnMissingResponse) {
97 final int size = out.size();
98 for (int i = oldSize; i < size; i ++) {
99 Object msg = out.get(i);
100 if (msg instanceof LastMemcacheContent) {
101 requestResponseCounter.decrementAndGet();
102 }
103 }
104 }
105 }
106
107 @Override
108 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
109 super.channelInactive(ctx);
110
111 if (failOnMissingResponse) {
112 long missingResponses = requestResponseCounter.get();
113 if (missingResponses > 0) {
114 ctx.fireExceptionCaught(new PrematureChannelClosureException(
115 "channel gone inactive with " + missingResponses +
116 " missing response(s)"));
117 }
118 }
119 }
120 }
121 }