1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.objectecho;
17
18 import org.jboss.netty.channel.ChannelEvent;
19 import org.jboss.netty.channel.ChannelHandlerContext;
20 import org.jboss.netty.channel.ChannelState;
21 import org.jboss.netty.channel.ChannelStateEvent;
22 import org.jboss.netty.channel.ExceptionEvent;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
25
26 import java.util.ArrayList;
27 import java.util.List;
28 import java.util.concurrent.atomic.AtomicLong;
29
30
31
32
33
34
35 public class ObjectEchoClientHandler extends SimpleChannelUpstreamHandler {
36
37 private final List<Integer> firstMessage;
38 private final AtomicLong transferredMessages = new AtomicLong();
39
40
41
42
43 public ObjectEchoClientHandler(int firstMessageSize) {
44 if (firstMessageSize <= 0) {
45 throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
46 }
47 firstMessage = new ArrayList<Integer>(firstMessageSize);
48 for (int i = 0; i < firstMessageSize; i ++) {
49 firstMessage.add(i);
50 }
51 }
52
53 public long getTransferredMessages() {
54 return transferredMessages.get();
55 }
56
57 @Override
58 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
59 if (e instanceof ChannelStateEvent &&
60 ((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {
61 System.err.println(e);
62 }
63 super.handleUpstream(ctx, e);
64 }
65
66 @Override
67 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
68
69 e.getChannel().write(firstMessage);
70 }
71
72 @Override
73 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
74
75 transferredMessages.incrementAndGet();
76 e.getChannel().write(e.getMessage());
77 }
78
79 @Override
80 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
81 e.getCause().printStackTrace();
82 e.getChannel().close();
83 }
84 }