1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.factorial;
17
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.SimpleChannelInboundHandler;
22
23 import java.math.BigInteger;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.LinkedBlockingQueue;
26
27
28
29
30
31
32
33
34 public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
35
36 private ChannelHandlerContext ctx;
37 private int receivedMessages;
38 private int next = 1;
39 final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();
40
41 public BigInteger getFactorial() {
42 boolean interrupted = false;
43 try {
44 for (;;) {
45 try {
46 return answer.take();
47 } catch (InterruptedException ignore) {
48 interrupted = true;
49 }
50 }
51 } finally {
52 if (interrupted) {
53 Thread.currentThread().interrupt();
54 }
55 }
56 }
57
58 @Override
59 public void channelActive(ChannelHandlerContext ctx) {
60 this.ctx = ctx;
61 sendNumbers();
62 }
63
64 @Override
65 public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
66 receivedMessages ++;
67 if (receivedMessages == FactorialClient.COUNT) {
68
69 ctx.channel().close().addListener(new ChannelFutureListener() {
70 @Override
71 public void operationComplete(ChannelFuture future) {
72 boolean offered = answer.offer(msg);
73 assert offered;
74 }
75 });
76 }
77 }
78
79 @Override
80 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
81 cause.printStackTrace();
82 ctx.close();
83 }
84
85 private void sendNumbers() {
86
87 ChannelFuture future = null;
88 for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
89 future = ctx.write(Integer.valueOf(next));
90 next++;
91 }
92 if (next <= FactorialClient.COUNT) {
93 assert future != null;
94 future.addListener(numberSender);
95 }
96 ctx.flush();
97 }
98
99 private final ChannelFutureListener numberSender = new ChannelFutureListener() {
100 @Override
101 public void operationComplete(ChannelFuture future) throws Exception {
102 if (future.isSuccess()) {
103 sendNumbers();
104 } else {
105 future.cause().printStackTrace();
106 future.channel().close();
107 }
108 }
109 };
110 }