查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.ChannelEvent;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ChannelFutureListener;
21  import org.jboss.netty.channel.ChannelPipeline;
22  import org.jboss.netty.channel.ChannelState;
23  import org.jboss.netty.channel.ChannelStateEvent;
24  import org.jboss.netty.channel.MessageEvent;
25  import org.jboss.netty.logging.InternalLogger;
26  import org.jboss.netty.logging.InternalLoggerFactory;
27  
28  import java.net.ConnectException;
29  import java.net.SocketAddress;
30  import java.nio.channels.ClosedChannelException;
31  
32  import static org.jboss.netty.channel.Channels.*;
33  
34  class NioClientSocketPipelineSink extends AbstractNioChannelSink {
35  
36      static final InternalLogger logger =
37          InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
38  
39      private final BossPool<NioClientBoss> bossPool;
40  
41      NioClientSocketPipelineSink(BossPool<NioClientBoss> bossPool) {
42          this.bossPool = bossPool;
43      }
44  
45      public void eventSunk(
46              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
47          if (e instanceof ChannelStateEvent) {
48              ChannelStateEvent event = (ChannelStateEvent) e;
49              NioClientSocketChannel channel =
50                  (NioClientSocketChannel) event.getChannel();
51              ChannelFuture future = event.getFuture();
52              ChannelState state = event.getState();
53              Object value = event.getValue();
54  
55              switch (state) {
56              case OPEN:
57                  if (Boolean.FALSE.equals(value)) {
58                      channel.worker.close(channel, future);
59                  }
60                  break;
61              case BOUND:
62                  if (value != null) {
63                      bind(channel, future, (SocketAddress) value);
64                  } else {
65                      channel.worker.close(channel, future);
66                  }
67                  break;
68              case CONNECTED:
69                  if (value != null) {
70                      connect(channel, future, (SocketAddress) value);
71                  } else {
72                      channel.worker.close(channel, future);
73                  }
74                  break;
75              case INTEREST_OPS:
76                  channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
77                  break;
78              }
79          } else if (e instanceof MessageEvent) {
80              MessageEvent event = (MessageEvent) e;
81              NioSocketChannel channel = (NioSocketChannel) event.getChannel();
82              boolean offered = channel.writeBufferQueue.offer(event);
83              assert offered;
84              channel.worker.writeFromUserCode(channel);
85          }
86      }
87  
88      private static void bind(
89              NioClientSocketChannel channel, ChannelFuture future,
90              SocketAddress localAddress) {
91          try {
92              channel.channel.socket().bind(localAddress);
93              channel.boundManually = true;
94              channel.setBound();
95              future.setSuccess();
96              fireChannelBound(channel, channel.getLocalAddress());
97          } catch (Throwable t) {
98              future.setFailure(t);
99              fireExceptionCaught(channel, t);
100         }
101     }
102 
103     private void connect(
104             final NioClientSocketChannel channel, final ChannelFuture cf,
105             SocketAddress remoteAddress) {
106         channel.requestedRemoteAddress = remoteAddress;
107         try {
108             if (channel.channel.connect(remoteAddress)) {
109                 channel.worker.register(channel, cf);
110             } else {
111                 channel.getCloseFuture().addListener(new ChannelFutureListener() {
112                     public void operationComplete(ChannelFuture f)
113                             throws Exception {
114                         if (!cf.isDone()) {
115                             cf.setFailure(new ClosedChannelException());
116                         }
117                     }
118                 });
119                 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
120                 channel.connectFuture = cf;
121                 nextBoss().register(channel, cf);
122             }
123 
124         } catch (Throwable t) {
125             if (t instanceof ConnectException) {
126                 Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
127                 newT.setStackTrace(t.getStackTrace());
128                 t = newT;
129             }
130             cf.setFailure(t);
131             fireExceptionCaught(channel, t);
132             channel.worker.close(channel, succeededFuture(channel));
133         }
134     }
135 
136     private NioClientBoss nextBoss() {
137         return bossPool.nextBoss();
138     }
139 
140 }