查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.memcache.binary;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.channel.CombinedChannelDuplexHandler;
21  import io.netty.handler.codec.PrematureChannelClosureException;
22  import io.netty.handler.codec.memcache.LastMemcacheContent;
23  import io.netty.util.internal.UnstableApi;
24  
25  import java.util.List;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  /**
29   * The client codec that combines the proper encoder and decoder.
30   * <p/>
31   * Use this codec if you want to implement a memcache client that speaks the binary protocol. It
32   * combines both the {@link BinaryMemcacheResponseDecoder} and the {@link BinaryMemcacheRequestEncoder}.
33   * <p/>
34   * Optionally, it counts the number of outstanding responses and raises an exception if - on connection
35   * close - the list is not 0 (this is turned off by default). You can also define a chunk size for the
36   * content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they
37   * will be passed up the pipeline and not queued up to the chunk size.
38   */
39  @UnstableApi
40  public final class BinaryMemcacheClientCodec extends
41          CombinedChannelDuplexHandler<BinaryMemcacheResponseDecoder, BinaryMemcacheRequestEncoder> {
42  
43      private final boolean failOnMissingResponse;
44      private final AtomicLong requestResponseCounter = new AtomicLong();
45  
46      /**
47       * Create a new {@link BinaryMemcacheClientCodec} with the default settings applied.
48       */
49      public BinaryMemcacheClientCodec() {
50          this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE);
51      }
52  
53      /**
54       * Create a new {@link BinaryMemcacheClientCodec} and set a custom chunk size.
55       *
56       * @param decodeChunkSize the maximum chunk size.
57       */
58      public BinaryMemcacheClientCodec(int decodeChunkSize) {
59          this(decodeChunkSize, false);
60      }
61  
62      /**
63       * Create a new {@link BinaryMemcacheClientCodec} with custom settings.
64       *
65       * @param decodeChunkSize       the maximum chunk size.
66       * @param failOnMissingResponse report if after close there are outstanding requests.
67       */
68      public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
69          this.failOnMissingResponse = failOnMissingResponse;
70          init(new Decoder(decodeChunkSize), new Encoder());
71      }
72  
73      private final class Encoder extends BinaryMemcacheRequestEncoder {
74  
75          @Override
76          protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
77              super.encode(ctx, msg, out);
78  
79              if (failOnMissingResponse && msg instanceof LastMemcacheContent) {
80                  requestResponseCounter.incrementAndGet();
81              }
82          }
83      }
84  
85      private final class Decoder extends BinaryMemcacheResponseDecoder {
86  
87          Decoder(int chunkSize) {
88              super(chunkSize);
89          }
90  
91          @Override
92          protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
93              int oldSize = out.size();
94              super.decode(ctx, in, out);
95  
96              if (failOnMissingResponse) {
97                  final int size = out.size();
98                  for (int i = oldSize; i < size; i ++) {
99                      Object msg = out.get(i);
100                     if (msg instanceof LastMemcacheContent) {
101                         requestResponseCounter.decrementAndGet();
102                     }
103                 }
104             }
105         }
106 
107         @Override
108         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
109             super.channelInactive(ctx);
110 
111             if (failOnMissingResponse) {
112                 long missingResponses = requestResponseCounter.get();
113                 if (missingResponses > 0) {
114                     ctx.fireExceptionCaught(new PrematureChannelClosureException(
115                         "channel gone inactive with " + missingResponses +
116                             " missing response(s)"));
117                 }
118             }
119         }
120     }
121 }