查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.example.factorial;
17  
18  import io.netty.channel.ChannelFuture;
19  import io.netty.channel.ChannelFutureListener;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.channel.SimpleChannelInboundHandler;
22  
23  import java.math.BigInteger;
24  import java.util.concurrent.BlockingQueue;
25  import java.util.concurrent.LinkedBlockingQueue;
26  
27  /**
28   * Handler for a client-side channel.  This handler maintains stateful
29   * information which is specific to a certain channel using member variables.
30   * Therefore, an instance of this handler can cover only one channel.  You have
31   * to create a new handler instance whenever you create a new channel and insert
32   * this handler to avoid a race condition.
33   */
34  public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
35  
36      private ChannelHandlerContext ctx;
37      private int receivedMessages;
38      private int next = 1;
39      final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
40  
41      public BigInteger getFactorial() {
42          boolean interrupted = false;
43          try {
44              for (;;) {
45                  try {
46                      return answer.take();
47                  } catch (InterruptedException ignore) {
48                      interrupted = true;
49                  }
50              }
51          } finally {
52              if (interrupted) {
53                  Thread.currentThread().interrupt();
54              }
55          }
56      }
57  
58      @Override
59      public void channelActive(ChannelHandlerContext ctx) {
60          this.ctx = ctx;
61          sendNumbers();
62      }
63  
64      @Override
65      public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
66          receivedMessages ++;
67          if (receivedMessages == FactorialClient.COUNT) {
68              // Offer the answer after closing the connection.
69              ctx.channel().close().addListener(new ChannelFutureListener() {
70                  @Override
71                  public void operationComplete(ChannelFuture future) {
72                      boolean offered = answer.offer(msg);
73                      assert offered;
74                  }
75              });
76          }
77      }
78  
79      @Override
80      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
81          cause.printStackTrace();
82          ctx.close();
83      }
84  
85      private void sendNumbers() {
86          // Do not send more than 4096 numbers.
87          ChannelFuture future = null;
88          for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
89              future = ctx.write(Integer.valueOf(next));
90              next++;
91          }
92          if (next <= FactorialClient.COUNT) {
93              assert future != null;
94              future.addListener(numberSender);
95          }
96          ctx.flush();
97      }
98  
99      private final ChannelFutureListener numberSender = new ChannelFutureListener() {
100         @Override
101         public void operationComplete(ChannelFuture future) throws Exception {
102             if (future.isSuccess()) {
103                 sendNumbers();
104             } else {
105                 future.cause().printStackTrace();
106                 future.channel().close();
107             }
108         }
109     };
110 }