1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.example.proxy;
17
18 import org.jboss.netty.bootstrap.ClientBootstrap;
19 import org.jboss.netty.buffer.ChannelBuffer;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.ExceptionEvent;
27 import org.jboss.netty.channel.MessageEvent;
28 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
30
31 import java.net.InetSocketAddress;
32
33 public class HexDumpProxyInboundHandler extends SimpleChannelUpstreamHandler {
34
35 private final ClientSocketChannelFactory cf;
36
37
38
39
40 final Object trafficLock = new Object();
41
42 private volatile Channel outboundChannel;
43
44 public HexDumpProxyInboundHandler(ClientSocketChannelFactory cf) {
45 this.cf = cf;
46 }
47
48 @Override
49 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
50
51 final Channel inboundChannel = e.getChannel();
52 inboundChannel.setReadable(false);
53
54
55 ClientBootstrap cb = new ClientBootstrap(cf);
56 cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));
57 ChannelFuture f = cb.connect(new InetSocketAddress(HexDumpProxy.REMOTE_HOST, HexDumpProxy.REMOTE_PORT));
58
59 outboundChannel = f.getChannel();
60 f.addListener(new ChannelFutureListener() {
61 public void operationComplete(ChannelFuture future) {
62 if (future.isSuccess()) {
63
64
65 inboundChannel.setReadable(true);
66 } else {
67
68 inboundChannel.close();
69 }
70 }
71 });
72 }
73
74 @Override
75 public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
76 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
77
78 synchronized (trafficLock) {
79 outboundChannel.write(msg);
80
81
82 if (!outboundChannel.isWritable()) {
83 e.getChannel().setReadable(false);
84 }
85 }
86 }
87
88 @Override
89 public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
90
91
92 synchronized (trafficLock) {
93 if (e.getChannel().isWritable()) {
94 if (outboundChannel != null) {
95 outboundChannel.setReadable(true);
96 }
97 }
98 }
99 }
100
101 @Override
102 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
103 if (outboundChannel != null) {
104 closeOnFlush(outboundChannel);
105 }
106 }
107
108 @Override
109 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
110 e.getCause().printStackTrace();
111 closeOnFlush(e.getChannel());
112 }
113
114 private class OutboundHandler extends SimpleChannelUpstreamHandler {
115
116 private final Channel inboundChannel;
117
118 OutboundHandler(Channel inboundChannel) {
119 this.inboundChannel = inboundChannel;
120 }
121
122 @Override
123 public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
124 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
125
126 synchronized (trafficLock) {
127 inboundChannel.write(msg);
128
129
130 if (!inboundChannel.isWritable()) {
131 e.getChannel().setReadable(false);
132 }
133 }
134 }
135
136 @Override
137 public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
138
139
140 synchronized (trafficLock) {
141 if (e.getChannel().isWritable()) {
142 inboundChannel.setReadable(true);
143 }
144 }
145 }
146
147 @Override
148 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
149 closeOnFlush(inboundChannel);
150 }
151
152 @Override
153 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
154 e.getCause().printStackTrace();
155 closeOnFlush(e.getChannel());
156 }
157 }
158
159
160
161
162 static void closeOnFlush(Channel ch) {
163 if (ch.isConnected()) {
164 ch.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
165 }
166 }
167 }