查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import org.jboss.netty.channel.ChannelEvent;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ChannelFutureListener;
21  import org.jboss.netty.channel.ChannelPipeline;
22  import org.jboss.netty.channel.ChannelState;
23  import org.jboss.netty.channel.ChannelStateEvent;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.util.ThreadNameDeterminer;
26  import org.jboss.netty.util.ThreadRenamingRunnable;
27  import org.jboss.netty.util.internal.DeadLockProofWorker;
28  
29  import java.io.PushbackInputStream;
30  import java.net.ConnectException;
31  import java.net.SocketAddress;
32  import java.util.concurrent.Executor;
33  
34  import static org.jboss.netty.channel.Channels.*;
35  
36  class OioClientSocketPipelineSink extends AbstractOioChannelSink {
37  
38      private final Executor workerExecutor;
39      private final ThreadNameDeterminer determiner;
40  
41      OioClientSocketPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
42          this.workerExecutor = workerExecutor;
43          this.determiner = determiner;
44      }
45  
46      public void eventSunk(
47              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
48          OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
49          ChannelFuture future = e.getFuture();
50          if (e instanceof ChannelStateEvent) {
51              ChannelStateEvent stateEvent = (ChannelStateEvent) e;
52              ChannelState state = stateEvent.getState();
53              Object value = stateEvent.getValue();
54              switch (state) {
55              case OPEN:
56                  if (Boolean.FALSE.equals(value)) {
57                      AbstractOioWorker.close(channel, future);
58                  }
59                  break;
60              case BOUND:
61                  if (value != null) {
62                      bind(channel, future, (SocketAddress) value);
63                  } else {
64                      AbstractOioWorker.close(channel, future);
65                  }
66                  break;
67              case CONNECTED:
68                  if (value != null) {
69                      connect(channel, future, (SocketAddress) value);
70                  } else {
71                      AbstractOioWorker.close(channel, future);
72                  }
73                  break;
74              case INTEREST_OPS:
75                  AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
76                  break;
77              }
78          } else if (e instanceof MessageEvent) {
79              OioWorker.write(
80                      channel, future,
81                      ((MessageEvent) e).getMessage());
82          }
83      }
84  
85      private static void bind(
86              OioClientSocketChannel channel, ChannelFuture future,
87              SocketAddress localAddress) {
88          try {
89              channel.socket.bind(localAddress);
90              future.setSuccess();
91              fireChannelBound(channel, channel.getLocalAddress());
92          } catch (Throwable t) {
93              future.setFailure(t);
94              fireExceptionCaught(channel, t);
95          }
96      }
97  
98      private void connect(
99              OioClientSocketChannel channel, ChannelFuture future,
100             SocketAddress remoteAddress) {
101 
102         boolean bound = channel.isBound();
103         boolean connected = false;
104         boolean workerStarted = false;
105 
106         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
107 
108         try {
109             channel.socket.connect(
110                     remoteAddress, channel.getConfig().getConnectTimeoutMillis());
111             connected = true;
112 
113             // Obtain I/O stream.
114             channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
115             channel.out = channel.socket.getOutputStream();
116 
117             // Fire events.
118             future.setSuccess();
119             if (!bound) {
120                 fireChannelBound(channel, channel.getLocalAddress());
121             }
122             fireChannelConnected(channel, channel.getRemoteAddress());
123 
124             // Start the business.
125             DeadLockProofWorker.start(
126                     workerExecutor,
127                     new ThreadRenamingRunnable(
128                             new OioWorker(channel),
129                             "Old I/O client worker (" + channel + ')',
130                             determiner));
131             workerStarted = true;
132         } catch (Throwable t) {
133             if (t instanceof ConnectException) {
134                 if (t instanceof ConnectException) {
135                     Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
136                     newT.setStackTrace(t.getStackTrace());
137                     t = newT;
138                 }
139             }
140             future.setFailure(t);
141             fireExceptionCaught(channel, t);
142         } finally {
143             if (connected && !workerStarted) {
144                 AbstractOioWorker.close(channel, future);
145             }
146         }
147     }
148 }