查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelMetadata;
21  import io.netty.channel.ChannelOutboundBuffer;
22  import io.netty.channel.ChannelPipeline;
23  import io.netty.channel.EventLoop;
24  import io.netty.channel.ServerChannel;
25  import io.netty.util.internal.UnstableApi;
26  
27  import java.net.InetSocketAddress;
28  import java.net.SocketAddress;
29  
30  @UnstableApi
31  public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
32      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
33  
34      AbstractKQueueServerChannel(BsdSocket fd) {
35          this(fd, isSoErrorZero(fd));
36      }
37  
38      AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
39          super(null, fd, active);
40      }
41  
42      @Override
43      public ChannelMetadata metadata() {
44          return METADATA;
45      }
46  
47      @Override
48      protected boolean isCompatible(EventLoop loop) {
49          return loop instanceof KQueueEventLoop;
50      }
51  
52      @Override
53      protected InetSocketAddress remoteAddress0() {
54          return null;
55      }
56  
57      @Override
58      protected AbstractKQueueUnsafe newUnsafe() {
59          return new KQueueServerSocketUnsafe();
60      }
61  
62      @Override
63      protected void doWrite(ChannelOutboundBuffer in) throws Exception {
64          throw new UnsupportedOperationException();
65      }
66  
67      @Override
68      protected Object filterOutboundMessage(Object msg) throws Exception {
69          throw new UnsupportedOperationException();
70      }
71  
72      abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
73  
74      @Override
75      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
76          throw new UnsupportedOperationException();
77      }
78  
79      final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
80          // Will hold the remote address after accept(...) was successful.
81          // We need 24 bytes for the address as maximum + 1 byte for storing the capacity.
82          // So use 26 bytes as it's a power of two.
83          private final byte[] acceptedAddress = new byte[26];
84  
85          @Override
86          void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
87              assert eventLoop().inEventLoop();
88              final ChannelConfig config = config();
89              if (shouldBreakReadReady(config)) {
90                  clearReadFilter0();
91                  return;
92              }
93              final ChannelPipeline pipeline = pipeline();
94              allocHandle.reset(config);
95              allocHandle.attemptedBytesRead(1);
96              readReadyBefore();
97  
98              Throwable exception = null;
99              try {
100                 try {
101                     do {
102                         int acceptFd = socket.accept(acceptedAddress);
103                         if (acceptFd == -1) {
104                             // this means everything was handled for now
105                             allocHandle.lastBytesRead(-1);
106                             break;
107                         }
108                         allocHandle.lastBytesRead(1);
109                         allocHandle.incMessagesRead(1);
110 
111                         readPending = false;
112                         pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
113                                                                  acceptedAddress[0]));
114                     } while (allocHandle.continueReading());
115                 } catch (Throwable t) {
116                     exception = t;
117                 }
118                 allocHandle.readComplete();
119                 pipeline.fireChannelReadComplete();
120 
121                 if (exception != null) {
122                     pipeline.fireExceptionCaught(exception);
123                 }
124             } finally {
125                 readReadyFinally(config);
126             }
127         }
128     }
129 }