查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ConnectTimeoutException;
21  import org.jboss.netty.util.ThreadNameDeterminer;
22  import org.jboss.netty.util.ThreadRenamingRunnable;
23  import org.jboss.netty.util.Timeout;
24  import org.jboss.netty.util.Timer;
25  import org.jboss.netty.util.TimerTask;
26  
27  import java.io.IOException;
28  import java.net.ConnectException;
29  import java.nio.channels.ClosedChannelException;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.Selector;
32  import java.util.Iterator;
33  import java.util.Set;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.TimeUnit;
36  
37  import static org.jboss.netty.channel.Channels.*;
38  
39  /**
40   * {@link Boss} implementation that handles the  connection attempts of clients
41   */
42  public final class NioClientBoss extends AbstractNioSelector implements Boss {
43  
44      private final TimerTask wakeupTask = new TimerTask() {
45          public void run(Timeout timeout) throws Exception {
46              // This is needed to prevent a possible race that can lead to a NPE
47              // when the selector is closed before this is run
48              //
49              // See https://github.com/netty/netty/issues/685
50              Selector selector = NioClientBoss.this.selector;
51  
52              if (selector != null) {
53                  if (wakenUp.compareAndSet(false, true)) {
54                      selector.wakeup();
55                  }
56              }
57          }
58      };
59  
60      private final Timer timer;
61  
62      NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) {
63          super(bossExecutor, determiner);
64          this.timer = timer;
65      }
66  
67      @Override
68      protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
69          return new ThreadRenamingRunnable(this, "New I/O boss #" + id, determiner);
70      }
71  
72      @Override
73      protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
74          return new RegisterTask(this, (NioClientSocketChannel) channel);
75      }
76  
77      @Override
78      protected void process(Selector selector) {
79          processSelectedKeys(selector.selectedKeys());
80  
81          // Handle connection timeout every 10 milliseconds approximately.
82          long currentTimeNanos = System.nanoTime();
83          processConnectTimeout(selector.keys(), currentTimeNanos);
84      }
85  
86      private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
87  
88          // check if the set is empty and if so just return to not create garbage by
89          // creating a new Iterator every time even if there is nothing to process.
90          // See https://github.com/netty/netty/issues/597
91          if (selectedKeys.isEmpty()) {
92              return;
93          }
94          for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
95              SelectionKey k = i.next();
96              i.remove();
97  
98              if (!k.isValid()) {
99                  close(k);
100                 continue;
101             }
102 
103             try {
104                 if (k.isConnectable()) {
105                     connect(k);
106                 }
107             } catch (Throwable t) {
108                 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
109                 ch.connectFuture.setFailure(t);
110                 fireExceptionCaught(ch, t);
111                 k.cancel(); // Some JDK implementations run into an infinite loop without this.
112                 ch.worker.close(ch, succeededFuture(ch));
113             }
114         }
115     }
116 
117     private static void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
118         for (SelectionKey k: keys) {
119             if (!k.isValid()) {
120                 // Comment the close call again as it gave us major problems
121                 // with ClosedChannelExceptions.
122                 //
123                 // See:
124                 // * https://github.com/netty/netty/issues/142
125                 // * https://github.com/netty/netty/issues/138
126                 //
127                 // close(k);
128                 continue;
129             }
130 
131             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
132             if (ch.connectDeadlineNanos > 0 &&
133                     currentTimeNanos >= ch.connectDeadlineNanos) {
134 
135                 // Create a new ConnectException everytime and not cache it as otherwise we end up with
136                 // using the wrong remoteaddress in the ConnectException message.
137                 //
138                 // See https://github.com/netty/netty/issues/2713
139                 ConnectException cause =
140                         new ConnectTimeoutException("connection timed out: " + ch.requestedRemoteAddress);
141 
142                 ch.connectFuture.setFailure(cause);
143                 fireExceptionCaught(ch, cause);
144                 ch.worker.close(ch, succeededFuture(ch));
145             }
146         }
147     }
148 
149     private static void connect(SelectionKey k) throws IOException {
150         NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
151         try {
152             if (ch.channel.finishConnect()) {
153                 k.cancel();
154                 if (ch.timoutTimer != null) {
155                     ch.timoutTimer.cancel();
156                 }
157                 ch.worker.register(ch, ch.connectFuture);
158             }
159         } catch (ConnectException e) {
160             ConnectException newE = new ConnectException(e.getMessage() + ": " + ch.requestedRemoteAddress);
161             newE.setStackTrace(e.getStackTrace());
162             throw newE;
163         }
164     }
165 
166     @Override
167     protected void close(SelectionKey k) {
168         NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
169         ch.worker.close(ch, succeededFuture(ch));
170     }
171 
172     private final class RegisterTask implements Runnable {
173         private final NioClientBoss boss;
174         private final NioClientSocketChannel channel;
175 
176         RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) {
177             this.boss = boss;
178             this.channel = channel;
179         }
180 
181         public void run() {
182             int timeout = channel.getConfig().getConnectTimeoutMillis();
183             if (timeout > 0) {
184                 if (!channel.isConnected()) {
185                     channel.timoutTimer = timer.newTimeout(wakeupTask,
186                             timeout, TimeUnit.MILLISECONDS);
187                 }
188             }
189             try {
190                 channel.channel.register(
191                         boss.selector, SelectionKey.OP_CONNECT, channel);
192             } catch (ClosedChannelException e) {
193                 channel.worker.close(channel, succeededFuture(channel));
194             }
195 
196             int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
197             if (connectTimeout > 0) {
198                 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
199             }
200         }
201     }
202 }