查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import org.jboss.netty.channel.AbstractChannelSink;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelEvent;
23  import org.jboss.netty.channel.ChannelException;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelPipeline;
26  import org.jboss.netty.channel.ChannelState;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.MessageEvent;
29  
30  final class LocalServerChannelSink extends AbstractChannelSink {
31  
32      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
33          Channel channel = e.getChannel();
34          if (channel instanceof DefaultLocalServerChannel) {
35              handleServerChannel(e);
36          } else if (channel instanceof DefaultLocalChannel) {
37              handleAcceptedChannel(e);
38          }
39      }
40  
41      private static void handleServerChannel(ChannelEvent e) {
42          if (!(e instanceof ChannelStateEvent)) {
43              return;
44          }
45  
46          ChannelStateEvent event = (ChannelStateEvent) e;
47          DefaultLocalServerChannel channel =
48                (DefaultLocalServerChannel) event.getChannel();
49          ChannelFuture future = event.getFuture();
50          ChannelState state = event.getState();
51          Object value = event.getValue();
52          switch (state) {
53          case OPEN:
54              if (Boolean.FALSE.equals(value)) {
55                  close(channel, future);
56              }
57              break;
58          case BOUND:
59              if (value != null) {
60                  bind(channel, future, (LocalAddress) value);
61              } else {
62                  close(channel, future);
63              }
64              break;
65          }
66      }
67  
68      private static void handleAcceptedChannel(ChannelEvent e) {
69          if (e instanceof ChannelStateEvent) {
70              ChannelStateEvent event = (ChannelStateEvent) e;
71              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
72              ChannelFuture future = event.getFuture();
73              ChannelState state = event.getState();
74              Object value = event.getValue();
75  
76              switch (state) {
77              case OPEN:
78                  if (Boolean.FALSE.equals(value)) {
79                      channel.closeNow(future);
80                  }
81                  break;
82              case BOUND:
83              case CONNECTED:
84                  if (value == null) {
85                      channel.closeNow(future);
86                  }
87                  break;
88              case INTEREST_OPS:
89                  // Unsupported - discard silently.
90                  future.setSuccess();
91                  break;
92              }
93          } else if (e instanceof MessageEvent) {
94              MessageEvent event = (MessageEvent) e;
95              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
96              boolean offered = channel.writeBuffer.offer(event);
97              assert offered;
98              channel.flushWriteBuffer();
99          }
100     }
101 
102     private static void bind(DefaultLocalServerChannel channel, ChannelFuture future, LocalAddress localAddress) {
103         try {
104             if (!LocalChannelRegistry.register(localAddress, channel)) {
105                 throw new ChannelException("address already in use: " + localAddress);
106             }
107             if (!channel.bound.compareAndSet(false, true)) {
108                 throw new ChannelException("already bound");
109             }
110 
111             channel.localAddress = localAddress;
112             future.setSuccess();
113             fireChannelBound(channel, localAddress);
114         } catch (Throwable t) {
115             LocalChannelRegistry.unregister(localAddress);
116             future.setFailure(t);
117             fireExceptionCaught(channel, t);
118         }
119     }
120 
121     private static void close(DefaultLocalServerChannel channel, ChannelFuture future) {
122         try {
123             if (channel.setClosed()) {
124                 future.setSuccess();
125                 LocalAddress localAddress = channel.localAddress;
126                 if (channel.bound.compareAndSet(true, false)) {
127                     channel.localAddress = null;
128                     LocalChannelRegistry.unregister(localAddress);
129                     fireChannelUnbound(channel);
130                 }
131                 fireChannelClosed(channel);
132             } else {
133                 future.setSuccess();
134             }
135         } catch (Throwable t) {
136             future.setFailure(t);
137             fireExceptionCaught(channel, t);
138         }
139     }
140 }