1 /*
2 * Copyright 2013 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.codec.memcache.binary;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.CombinedChannelDuplexHandler;
21 import io.netty.handler.codec.PrematureChannelClosureException;
22 import io.netty.handler.codec.memcache.LastMemcacheContent;
23 import io.netty.util.internal.UnstableApi;
24
25 import java.util.List;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 /**
29 * The client codec that combines the proper encoder and decoder.
30 * <p/>
31 * Use this codec if you want to implement a memcache client that speaks the binary protocol. It
32 * combines both the {@link BinaryMemcacheResponseDecoder} and the {@link BinaryMemcacheRequestEncoder}.
33 * <p/>
34 * Optionally, it counts the number of outstanding responses and raises an exception if - on connection
35 * close - the list is not 0 (this is turned off by default). You can also define a chunk size for the
36 * content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they
37 * will be passed up the pipeline and not queued up to the chunk size.
38 */
39 @UnstableApi
40 public final class BinaryMemcacheClientCodec extends
41 CombinedChannelDuplexHandler<BinaryMemcacheResponseDecoder, BinaryMemcacheRequestEncoder> {
42
43 private final boolean failOnMissingResponse;
44 private final AtomicLong requestResponseCounter = new AtomicLong();
45
46 /**
47 * Create a new {@link BinaryMemcacheClientCodec} with the default settings applied.
48 */
49 public BinaryMemcacheClientCodec() {
50 this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE);
51 }
52
53 /**
54 * Create a new {@link BinaryMemcacheClientCodec} and set a custom chunk size.
55 *
56 * @param decodeChunkSize the maximum chunk size.
57 */
58 public BinaryMemcacheClientCodec(int decodeChunkSize) {
59 this(decodeChunkSize, false);
60 }
61
62 /**
63 * Create a new {@link BinaryMemcacheClientCodec} with custom settings.
64 *
65 * @param decodeChunkSize the maximum chunk size.
66 * @param failOnMissingResponse report if after close there are outstanding requests.
67 */
68 public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
69 this.failOnMissingResponse = failOnMissingResponse;
70 init(new Decoder(decodeChunkSize), new Encoder());
71 }
72
73 private final class Encoder extends BinaryMemcacheRequestEncoder {
74
75 @Override
76 protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
77 super.encode(ctx, msg, out);
78
79 if (failOnMissingResponse && msg instanceof LastMemcacheContent) {
80 requestResponseCounter.incrementAndGet();
81 }
82 }
83 }
84
85 private final class Decoder extends BinaryMemcacheResponseDecoder {
86
87 Decoder(int chunkSize) {
88 super(chunkSize);
89 }
90
91 @Override
92 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
93 int oldSize = out.size();
94 super.decode(ctx, in, out);
95
96 if (failOnMissingResponse) {
97 final int size = out.size();
98 for (int i = oldSize; i < size; i ++) {
99 Object msg = out.get(i);
100 if (msg instanceof LastMemcacheContent) {
101 requestResponseCounter.decrementAndGet();
102 }
103 }
104 }
105 }
106
107 @Override
108 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
109 super.channelInactive(ctx);
110
111 if (failOnMissingResponse) {
112 long missingResponses = requestResponseCounter.get();
113 if (missingResponses > 0) {
114 ctx.fireExceptionCaught(new PrematureChannelClosureException(
115 "channel gone inactive with " + missingResponses +
116 " missing response(s)"));
117 }
118 }
119 }
120 }
121 }