1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelMetadata;
21 import io.netty.channel.ChannelOutboundBuffer;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.EventLoop;
24 import io.netty.channel.ServerChannel;
25 import io.netty.util.internal.UnstableApi;
26
27 import java.net.InetSocketAddress;
28 import java.net.SocketAddress;
29
30 @UnstableApi
31 public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel implements ServerChannel {
32 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
33
34 AbstractKQueueServerChannel(BsdSocket fd) {
35 this(fd, isSoErrorZero(fd));
36 }
37
38 AbstractKQueueServerChannel(BsdSocket fd, boolean active) {
39 super(null, fd, active);
40 }
41
42 @Override
43 public ChannelMetadata metadata() {
44 return METADATA;
45 }
46
47 @Override
48 protected boolean isCompatible(EventLoop loop) {
49 return loop instanceof KQueueEventLoop;
50 }
51
52 @Override
53 protected InetSocketAddress remoteAddress0() {
54 return null;
55 }
56
57 @Override
58 protected AbstractKQueueUnsafe newUnsafe() {
59 return new KQueueServerSocketUnsafe();
60 }
61
62 @Override
63 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
64 throw new UnsupportedOperationException();
65 }
66
67 @Override
68 protected Object filterOutboundMessage(Object msg) throws Exception {
69 throw new UnsupportedOperationException();
70 }
71
72 abstract Channel newChildChannel(int fd, byte[] remote, int offset, int len) throws Exception;
73
74 @Override
75 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
76 throw new UnsupportedOperationException();
77 }
78
79 final class KQueueServerSocketUnsafe extends AbstractKQueueUnsafe {
80
81
82
83 private final byte[] acceptedAddress = new byte[26];
84
85 @Override
86 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
87 assert eventLoop().inEventLoop();
88 final ChannelConfig config = config();
89 if (shouldBreakReadReady(config)) {
90 clearReadFilter0();
91 return;
92 }
93 final ChannelPipeline pipeline = pipeline();
94 allocHandle.reset(config);
95 allocHandle.attemptedBytesRead(1);
96 readReadyBefore();
97
98 Throwable exception = null;
99 try {
100 try {
101 do {
102 int acceptFd = socket.accept(acceptedAddress);
103 if (acceptFd == -1) {
104
105 allocHandle.lastBytesRead(-1);
106 break;
107 }
108 allocHandle.lastBytesRead(1);
109 allocHandle.incMessagesRead(1);
110
111 readPending = false;
112 pipeline.fireChannelRead(newChildChannel(acceptFd, acceptedAddress, 1,
113 acceptedAddress[0]));
114 } while (allocHandle.continueReading());
115 } catch (Throwable t) {
116 exception = t;
117 }
118 allocHandle.readComplete();
119 pipeline.fireChannelReadComplete();
120
121 if (exception != null) {
122 pipeline.fireExceptionCaught(exception);
123 }
124 } finally {
125 readReadyFinally(config);
126 }
127 }
128 }
129 }