查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.http;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.InetSocketAddress;
21  import java.net.SocketAddress;
22  import java.nio.channels.NotYetConnectedException;
23  
24  import javax.net.ssl.SSLContext;
25  import javax.net.ssl.SSLEngine;
26  
27  import org.jboss.netty.buffer.ChannelBuffer;
28  import org.jboss.netty.buffer.ChannelBuffers;
29  import org.jboss.netty.channel.AbstractChannel;
30  import org.jboss.netty.channel.ChannelException;
31  import org.jboss.netty.channel.ChannelFactory;
32  import org.jboss.netty.channel.ChannelFuture;
33  import org.jboss.netty.channel.ChannelFutureListener;
34  import org.jboss.netty.channel.ChannelHandlerContext;
35  import org.jboss.netty.channel.ChannelPipeline;
36  import org.jboss.netty.channel.ChannelSink;
37  import org.jboss.netty.channel.ChannelStateEvent;
38  import org.jboss.netty.channel.DefaultChannelPipeline;
39  import org.jboss.netty.channel.ExceptionEvent;
40  import org.jboss.netty.channel.MessageEvent;
41  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
42  import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
43  import org.jboss.netty.channel.socket.SocketChannel;
44  import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
45  import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
46  import org.jboss.netty.handler.codec.http.HttpChunk;
47  import org.jboss.netty.handler.codec.http.HttpHeaders;
48  import org.jboss.netty.handler.codec.http.HttpMethod;
49  import org.jboss.netty.handler.codec.http.HttpRequest;
50  import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
51  import org.jboss.netty.handler.codec.http.HttpResponse;
52  import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
53  import org.jboss.netty.handler.codec.http.HttpResponseStatus;
54  import org.jboss.netty.handler.codec.http.HttpVersion;
55  import org.jboss.netty.handler.ssl.SslHandler;
56  
57  /**
58   */
59  class HttpTunnelingClientSocketChannel extends AbstractChannel
60          implements SocketChannel {
61  
62      final HttpTunnelingSocketChannelConfig config;
63  
64      volatile boolean requestHeaderWritten;
65  
66      final Object interestOpsLock = new Object();
67  
68      final SocketChannel realChannel;
69  
70      private final ServletChannelHandler handler = new ServletChannelHandler();
71  
72      HttpTunnelingClientSocketChannel(
73              ChannelFactory factory,
74              ChannelPipeline pipeline,
75              ChannelSink sink, ClientSocketChannelFactory clientSocketChannelFactory) {
76  
77          super(null, factory, pipeline, sink);
78  
79          config = new HttpTunnelingSocketChannelConfig(this);
80          DefaultChannelPipeline channelPipeline = new DefaultChannelPipeline();
81          channelPipeline.addLast("decoder", new HttpResponseDecoder());
82          channelPipeline.addLast("encoder", new HttpRequestEncoder());
83          channelPipeline.addLast("handler", handler);
84          realChannel = clientSocketChannelFactory.newChannel(channelPipeline);
85  
86          fireChannelOpen(this);
87      }
88  
89      public HttpTunnelingSocketChannelConfig getConfig() {
90          return config;
91      }
92  
93      public InetSocketAddress getLocalAddress() {
94          return realChannel.getLocalAddress();
95      }
96  
97      public InetSocketAddress getRemoteAddress() {
98          return realChannel.getRemoteAddress();
99      }
100 
101     public boolean isBound() {
102         return realChannel.isBound();
103     }
104 
105     public boolean isConnected() {
106         return realChannel.isConnected();
107     }
108 
109     @Override
110     public int getInterestOps() {
111         return realChannel.getInterestOps();
112     }
113 
114     @Override
115     public boolean isWritable() {
116         return realChannel.isWritable();
117     }
118 
119     @Override
120     protected boolean setClosed() {
121         return super.setClosed();
122     }
123 
124     @Override
125     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
126         if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
127             return super.write(message, null);
128         } else {
129             return getUnsupportedOperationFuture();
130         }
131     }
132 
133     void bindReal(final SocketAddress localAddress, final ChannelFuture future) {
134         realChannel.bind(localAddress).addListener(new ChannelFutureListener() {
135             public void operationComplete(ChannelFuture f) {
136                 if (f.isSuccess()) {
137                     future.setSuccess();
138                 } else {
139                     future.setFailure(f.getCause());
140                 }
141             }
142         });
143     }
144 
145     void connectReal(final SocketAddress remoteAddress, final ChannelFuture future) {
146         final SocketChannel virtualChannel = this;
147         realChannel.connect(remoteAddress).addListener(new ChannelFutureListener() {
148             public void operationComplete(ChannelFuture f) {
149                 final String serverName = config.getServerName();
150                 final int serverPort = ((InetSocketAddress) remoteAddress).getPort();
151                 final String serverPath = config.getServerPath();
152 
153                 if (f.isSuccess()) {
154                     // Configure SSL
155                     SSLContext sslContext = config.getSslContext();
156                     ChannelFuture sslHandshakeFuture = null;
157                     if (sslContext != null) {
158                         // Create a new SSLEngine from the specified SSLContext.
159                         SSLEngine engine;
160                         if (serverName != null) {
161                             engine = sslContext.createSSLEngine(serverName, serverPort);
162                         } else {
163                             engine = sslContext.createSSLEngine();
164                         }
165 
166                         // Configure the SSLEngine.
167                         engine.setUseClientMode(true);
168                         engine.setEnableSessionCreation(config.isEnableSslSessionCreation());
169                         String[] enabledCipherSuites = config.getEnabledSslCipherSuites();
170                         if (enabledCipherSuites != null) {
171                             engine.setEnabledCipherSuites(enabledCipherSuites);
172                         }
173                         String[] enabledProtocols = config.getEnabledSslProtocols();
174                         if (enabledProtocols != null) {
175                             engine.setEnabledProtocols(enabledProtocols);
176                         }
177 
178                         SslHandler sslHandler = new SslHandler(engine);
179                         realChannel.getPipeline().addFirst("ssl", sslHandler);
180                         sslHandshakeFuture = sslHandler.handshake();
181                     }
182 
183                     // Send the HTTP request.
184                     final HttpRequest req = new DefaultHttpRequest(
185                             HttpVersion.HTTP_1_1, HttpMethod.POST, serverPath);
186                     if (serverName != null) {
187                         req.headers().set(HttpHeaders.Names.HOST, serverName);
188                     }
189                     req.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
190                     req.headers().set(HttpHeaders.Names.TRANSFER_ENCODING, HttpHeaders.Values.CHUNKED);
191                     req.headers().set(HttpHeaders.Names.CONTENT_TRANSFER_ENCODING, HttpHeaders.Values.BINARY);
192                     req.headers().set(HttpHeaders.Names.USER_AGENT, HttpTunnelingClientSocketChannel.class.getName());
193 
194                     if (sslHandshakeFuture == null) {
195                         realChannel.write(req);
196                         requestHeaderWritten = true;
197                         future.setSuccess();
198                         fireChannelConnected(virtualChannel, remoteAddress);
199                     } else {
200                         sslHandshakeFuture.addListener(new ChannelFutureListener() {
201                             public void operationComplete(ChannelFuture f) {
202                                 if (f.isSuccess()) {
203                                     realChannel.write(req);
204                                     requestHeaderWritten = true;
205                                     future.setSuccess();
206                                     fireChannelConnected(virtualChannel, remoteAddress);
207                                 } else {
208                                     future.setFailure(f.getCause());
209                                     fireExceptionCaught(virtualChannel, f.getCause());
210                                 }
211                             }
212                         });
213                     }
214                 } else {
215                     future.setFailure(f.getCause());
216                     fireExceptionCaught(virtualChannel, f.getCause());
217                 }
218             }
219         });
220     }
221 
222     void writeReal(final ChannelBuffer a, final ChannelFuture future) {
223         if (!requestHeaderWritten) {
224             throw new NotYetConnectedException();
225         }
226 
227         final int size = a.readableBytes();
228         final ChannelFuture f;
229 
230         if (size == 0) {
231             f = realChannel.write(ChannelBuffers.EMPTY_BUFFER);
232         } else {
233             f = realChannel.write(new DefaultHttpChunk(a));
234         }
235 
236         f.addListener(new ChannelFutureListener() {
237             public void operationComplete(ChannelFuture f) {
238                 if (f.isSuccess()) {
239                     future.setSuccess();
240                     if (size != 0) {
241                         fireWriteComplete(HttpTunnelingClientSocketChannel.this, size);
242                     }
243                 } else {
244                     future.setFailure(f.getCause());
245                 }
246             }
247         });
248     }
249 
250     private ChannelFuture writeLastChunk() {
251         if (!requestHeaderWritten) {
252             return failedFuture(this, new NotYetConnectedException());
253         } else {
254             return realChannel.write(HttpChunk.LAST_CHUNK);
255         }
256     }
257 
258     void setInterestOpsReal(final int interestOps, final ChannelFuture future) {
259         realChannel.setInterestOps(interestOps).addListener(new ChannelFutureListener() {
260             public void operationComplete(ChannelFuture f) {
261                 if (f.isSuccess()) {
262                     future.setSuccess();
263                 } else {
264                     future.setFailure(f.getCause());
265                 }
266             }
267         });
268     }
269 
270     void disconnectReal(final ChannelFuture future) {
271         writeLastChunk().addListener(new ChannelFutureListener() {
272             public void operationComplete(ChannelFuture f) {
273                 realChannel.disconnect().addListener(new ChannelFutureListener() {
274                     public void operationComplete(ChannelFuture f) {
275                         if (f.isSuccess()) {
276                             future.setSuccess();
277                         } else {
278                             future.setFailure(f.getCause());
279                         }
280                     }
281                 });
282             }
283         });
284     }
285 
286     void unbindReal(final ChannelFuture future) {
287         writeLastChunk().addListener(new ChannelFutureListener() {
288             public void operationComplete(ChannelFuture f) {
289                 realChannel.unbind().addListener(new ChannelFutureListener() {
290                     public void operationComplete(ChannelFuture f) {
291                         if (f.isSuccess()) {
292                             future.setSuccess();
293                         } else {
294                             future.setFailure(f.getCause());
295                         }
296                     }
297                 });
298             }
299         });
300     }
301 
302     void closeReal(final ChannelFuture future) {
303         writeLastChunk().addListener(new ChannelFutureListener() {
304             public void operationComplete(ChannelFuture f) {
305                 realChannel.close().addListener(new ChannelFutureListener() {
306                     public void operationComplete(ChannelFuture f) {
307                         // Note: If 'future' refers to the closeFuture,
308                         // setSuccess() and setFailure() do nothing.
309                         // AbstractChannel.setClosed() should be called instead.
310                         // (See AbstractChannel.ChannelCloseFuture)
311 
312                         if (f.isSuccess()) {
313                             future.setSuccess();
314                         } else {
315                             future.setFailure(f.getCause());
316                         }
317 
318                         // Notify the closeFuture.
319                         setClosed();
320                     }
321                 });
322             }
323         });
324     }
325 
326     final class ServletChannelHandler extends SimpleChannelUpstreamHandler {
327 
328         private volatile boolean readingChunks;
329         final SocketChannel virtualChannel = HttpTunnelingClientSocketChannel.this;
330 
331         @Override
332         public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
333                 throws Exception {
334             fireChannelBound(virtualChannel, (SocketAddress) e.getValue());
335         }
336 
337         @Override
338         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
339             if (!readingChunks) {
340                 HttpResponse res = (HttpResponse) e.getMessage();
341                 if (res.getStatus().getCode() != HttpResponseStatus.OK.getCode()) {
342                     throw new ChannelException("Unexpected HTTP response status: " + res.getStatus());
343                 }
344 
345                 if (res.isChunked()) {
346                     readingChunks = true;
347                 } else {
348                     ChannelBuffer content = res.getContent();
349                     if (content.readable()) {
350                         fireMessageReceived(HttpTunnelingClientSocketChannel.this, content);
351                     }
352                     // Reached to the end of response - close the request.
353                     closeReal(succeededFuture(virtualChannel));
354                 }
355             } else {
356                 HttpChunk chunk = (HttpChunk) e.getMessage();
357                 if (!chunk.isLast()) {
358                     fireMessageReceived(HttpTunnelingClientSocketChannel.this, chunk.getContent());
359                 } else {
360                     readingChunks = false;
361                     // Reached to the end of response - close the request.
362                     closeReal(succeededFuture(virtualChannel));
363                 }
364             }
365         }
366 
367         @Override
368         public void channelInterestChanged(ChannelHandlerContext ctx,
369                 ChannelStateEvent e) throws Exception {
370             fireChannelInterestChanged(virtualChannel);
371         }
372 
373         @Override
374         public void channelDisconnected(ChannelHandlerContext ctx,
375                 ChannelStateEvent e) throws Exception {
376             fireChannelDisconnected(virtualChannel);
377         }
378 
379         @Override
380         public void channelUnbound(ChannelHandlerContext ctx,
381                 ChannelStateEvent e) throws Exception {
382             fireChannelUnbound(virtualChannel);
383         }
384 
385         @Override
386         public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
387                 throws Exception {
388             fireChannelClosed(virtualChannel);
389         }
390 
391         @Override
392         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
393             fireExceptionCaught(virtualChannel, e.getCause());
394             realChannel.close();
395         }
396     }
397 }