查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.Socket;
22  import java.net.SocketAddress;
23  import java.net.SocketTimeoutException;
24  import java.util.concurrent.Executor;
25  
26  import org.jboss.netty.channel.Channel;
27  import org.jboss.netty.channel.ChannelEvent;
28  import org.jboss.netty.channel.ChannelFuture;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.ChannelState;
31  import org.jboss.netty.channel.ChannelStateEvent;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.logging.InternalLogger;
34  import org.jboss.netty.logging.InternalLoggerFactory;
35  import org.jboss.netty.util.ThreadNameDeterminer;
36  import org.jboss.netty.util.ThreadRenamingRunnable;
37  import org.jboss.netty.util.internal.DeadLockProofWorker;
38  
39  class OioServerSocketPipelineSink extends AbstractOioChannelSink {
40  
41      static final InternalLogger logger =
42          InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
43  
44      final Executor workerExecutor;
45      private final ThreadNameDeterminer determiner;
46  
47      OioServerSocketPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
48          this.workerExecutor = workerExecutor;
49          this.determiner = determiner;
50      }
51  
52      public void eventSunk(
53              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
54          Channel channel = e.getChannel();
55          if (channel instanceof OioServerSocketChannel) {
56              handleServerSocket(e);
57          } else if (channel instanceof OioAcceptedSocketChannel) {
58              handleAcceptedSocket(e);
59          }
60      }
61  
62      private void handleServerSocket(ChannelEvent e) {
63          if (!(e instanceof ChannelStateEvent)) {
64              return;
65          }
66  
67          ChannelStateEvent event = (ChannelStateEvent) e;
68          OioServerSocketChannel channel =
69              (OioServerSocketChannel) event.getChannel();
70          ChannelFuture future = event.getFuture();
71          ChannelState state = event.getState();
72          Object value = event.getValue();
73  
74          switch (state) {
75          case OPEN:
76              if (Boolean.FALSE.equals(value)) {
77                  close(channel, future);
78              }
79              break;
80          case BOUND:
81              if (value != null) {
82                  bind(channel, future, (SocketAddress) value);
83              } else {
84                  close(channel, future);
85              }
86              break;
87          }
88      }
89  
90      private static void handleAcceptedSocket(ChannelEvent e) {
91          if (e instanceof ChannelStateEvent) {
92              ChannelStateEvent event = (ChannelStateEvent) e;
93              OioAcceptedSocketChannel channel =
94                  (OioAcceptedSocketChannel) event.getChannel();
95              ChannelFuture future = event.getFuture();
96              ChannelState state = event.getState();
97              Object value = event.getValue();
98  
99              switch (state) {
100             case OPEN:
101                 if (Boolean.FALSE.equals(value)) {
102                     AbstractOioWorker.close(channel, future);
103                 }
104                 break;
105             case BOUND:
106             case CONNECTED:
107                 if (value == null) {
108                     AbstractOioWorker.close(channel, future);
109                 }
110                 break;
111             case INTEREST_OPS:
112                 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
113                 break;
114             }
115         } else if (e instanceof MessageEvent) {
116             MessageEvent event = (MessageEvent) e;
117             OioSocketChannel channel = (OioSocketChannel) event.getChannel();
118             ChannelFuture future = event.getFuture();
119             Object message = event.getMessage();
120             OioWorker.write(channel, future, message);
121         }
122     }
123 
124     private void bind(
125             OioServerSocketChannel channel, ChannelFuture future,
126             SocketAddress localAddress) {
127 
128         boolean bound = false;
129         boolean bossStarted = false;
130         try {
131             channel.socket.bind(localAddress, channel.getConfig().getBacklog());
132             bound = true;
133 
134             future.setSuccess();
135             localAddress = channel.getLocalAddress();
136             fireChannelBound(channel, localAddress);
137 
138             Executor bossExecutor =
139                 ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
140             DeadLockProofWorker.start(
141                     bossExecutor,
142                     new ThreadRenamingRunnable(
143                             new Boss(channel),
144                             "Old I/O server boss (" + channel + ')',
145                             determiner));
146             bossStarted = true;
147         } catch (Throwable t) {
148             future.setFailure(t);
149             fireExceptionCaught(channel, t);
150         } finally {
151             if (!bossStarted && bound) {
152                 close(channel, future);
153             }
154         }
155     }
156 
157     private static void close(OioServerSocketChannel channel, ChannelFuture future) {
158         boolean bound = channel.isBound();
159         try {
160             channel.socket.close();
161 
162             // Make sure the boss thread is not running so that that the future
163             // is notified after a new connection cannot be accepted anymore.
164             // See NETTY-256 for more information.
165             channel.shutdownLock.lock();
166             try {
167                 if (channel.setClosed()) {
168                     future.setSuccess();
169                     if (bound) {
170                         fireChannelUnbound(channel);
171                     }
172                     fireChannelClosed(channel);
173                 } else {
174                     future.setSuccess();
175                 }
176             } finally {
177                 channel.shutdownLock.unlock();
178             }
179         } catch (Throwable t) {
180             future.setFailure(t);
181             fireExceptionCaught(channel, t);
182         }
183     }
184 
185     private final class Boss implements Runnable {
186         private final OioServerSocketChannel channel;
187 
188         Boss(OioServerSocketChannel channel) {
189             this.channel = channel;
190         }
191 
192         public void run() {
193             channel.shutdownLock.lock();
194             try {
195                 while (channel.isBound()) {
196                     try {
197                         Socket acceptedSocket = channel.socket.accept();
198                         try {
199                             ChannelPipeline pipeline =
200                                 channel.getConfig().getPipelineFactory().getPipeline();
201                             final OioAcceptedSocketChannel acceptedChannel =
202                                 new OioAcceptedSocketChannel(
203                                         channel,
204                                         channel.getFactory(),
205                                         pipeline,
206                                         OioServerSocketPipelineSink.this,
207                                         acceptedSocket);
208                             DeadLockProofWorker.start(
209                                     workerExecutor,
210                                     new ThreadRenamingRunnable(
211                                             new OioWorker(acceptedChannel),
212                                             "Old I/O server worker (parentId: " +
213                                             channel.getId() + ", " + channel + ')',
214                                             determiner));
215                         } catch (Exception e) {
216                             if (logger.isWarnEnabled()) {
217                                 logger.warn(
218                                         "Failed to initialize an accepted socket.", e);
219                             }
220 
221                             try {
222                                 acceptedSocket.close();
223                             } catch (IOException e2) {
224                                 if (logger.isWarnEnabled()) {
225                                     logger.warn(
226                                             "Failed to close a partially accepted socket.",
227                                             e2);
228                                 }
229                             }
230                         }
231                     } catch (SocketTimeoutException e) {
232                         // Thrown every second to stop when requested.
233                     } catch (Throwable e) {
234                         // Do not log the exception if the server socket was closed
235                         // by a user.
236                         if (!channel.socket.isBound() || channel.socket.isClosed()) {
237                             break;
238                         }
239                         if (logger.isWarnEnabled()) {
240                             logger.warn(
241                                     "Failed to accept a connection.", e);
242                         }
243                         try {
244                             Thread.sleep(1000);
245                         } catch (InterruptedException e1) {
246                             // Ignore
247                         }
248                     }
249                 }
250             } finally {
251                 channel.shutdownLock.unlock();
252             }
253         }
254     }
255 }