1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.unix.DomainSocketAddress;
23 import io.netty.channel.unix.DomainSocketChannel;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.PeerCredentials;
26 import io.netty.util.internal.UnstableApi;
27
28 import java.io.IOException;
29 import java.net.SocketAddress;
30
31 import static io.netty.channel.kqueue.BsdSocket.newSocketDomain;
32
33 @UnstableApi
34 public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel implements DomainSocketChannel {
35 private final KQueueDomainSocketChannelConfig config = new KQueueDomainSocketChannelConfig(this);
36
37 private volatile DomainSocketAddress local;
38 private volatile DomainSocketAddress remote;
39
40 public KQueueDomainSocketChannel() {
41 super(null, newSocketDomain(), false);
42 }
43
44 public KQueueDomainSocketChannel(int fd) {
45 this(null, new BsdSocket(fd));
46 }
47
48 KQueueDomainSocketChannel(Channel parent, BsdSocket fd) {
49 super(parent, fd, true);
50 local = fd.localDomainSocketAddress();
51 remote = fd.remoteDomainSocketAddress();
52 }
53
54 @Override
55 protected AbstractKQueueUnsafe newUnsafe() {
56 return new KQueueDomainUnsafe();
57 }
58
59 @Override
60 protected DomainSocketAddress localAddress0() {
61 return local;
62 }
63
64 @Override
65 protected DomainSocketAddress remoteAddress0() {
66 return remote;
67 }
68
69 @Override
70 protected void doBind(SocketAddress localAddress) throws Exception {
71 socket.bind(localAddress);
72 local = (DomainSocketAddress) localAddress;
73 }
74
75 @Override
76 public KQueueDomainSocketChannelConfig config() {
77 return config;
78 }
79
80 @Override
81 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
82 if (super.doConnect(remoteAddress, localAddress)) {
83 local = localAddress != null ? (DomainSocketAddress) localAddress : socket.localDomainSocketAddress();
84 remote = (DomainSocketAddress) remoteAddress;
85 return true;
86 }
87 return false;
88 }
89
90 @Override
91 public DomainSocketAddress remoteAddress() {
92 return (DomainSocketAddress) super.remoteAddress();
93 }
94
95 @Override
96 public DomainSocketAddress localAddress() {
97 return (DomainSocketAddress) super.localAddress();
98 }
99
100 @Override
101 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
102 Object msg = in.current();
103 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
104
105 in.remove();
106 return 1;
107 }
108 return super.doWriteSingle(in);
109 }
110
111 @Override
112 protected Object filterOutboundMessage(Object msg) {
113 if (msg instanceof FileDescriptor) {
114 return msg;
115 }
116 return super.filterOutboundMessage(msg);
117 }
118
119
120
121
122
123 @UnstableApi
124 public PeerCredentials peerCredentials() throws IOException {
125 return socket.getPeerCredentials();
126 }
127
128 private final class KQueueDomainUnsafe extends KQueueStreamUnsafe {
129 @Override
130 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
131 switch (config().getReadMode()) {
132 case BYTES:
133 super.readReady(allocHandle);
134 break;
135 case FILE_DESCRIPTORS:
136 readReadyFd();
137 break;
138 default:
139 throw new Error();
140 }
141 }
142
143 private void readReadyFd() {
144 if (socket.isInputShutdown()) {
145 super.clearReadFilter0();
146 return;
147 }
148 final ChannelConfig config = config();
149 final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
150
151 final ChannelPipeline pipeline = pipeline();
152 allocHandle.reset(config);
153 readReadyBefore();
154
155 try {
156 readLoop: do {
157
158
159
160 int recvFd = socket.recvFd();
161 switch(recvFd) {
162 case 0:
163 allocHandle.lastBytesRead(0);
164 break readLoop;
165 case -1:
166 allocHandle.lastBytesRead(-1);
167 close(voidPromise());
168 return;
169 default:
170 allocHandle.lastBytesRead(1);
171 allocHandle.incMessagesRead(1);
172 readPending = false;
173 pipeline.fireChannelRead(new FileDescriptor(recvFd));
174 break;
175 }
176 } while (allocHandle.continueReading());
177
178 allocHandle.readComplete();
179 pipeline.fireChannelReadComplete();
180 } catch (Throwable t) {
181 allocHandle.readComplete();
182 pipeline.fireChannelReadComplete();
183 pipeline.fireExceptionCaught(t);
184 } finally {
185 readReadyFinally(config);
186 }
187 }
188 }
189 }