1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelOutboundBuffer;
21 import io.netty.channel.ChannelPipeline;
22 import io.netty.channel.unix.DomainSocketAddress;
23 import io.netty.channel.unix.DomainSocketChannel;
24 import io.netty.channel.unix.FileDescriptor;
25 import io.netty.channel.unix.PeerCredentials;
26 import io.netty.util.internal.UnstableApi;
27
28 import java.io.IOException;
29 import java.net.SocketAddress;
30
31 import static io.netty.channel.epoll.LinuxSocket.newSocketDomain;
32
33 public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel implements DomainSocketChannel {
34 private final EpollDomainSocketChannelConfig config = new EpollDomainSocketChannelConfig(this);
35
36 private volatile DomainSocketAddress local;
37 private volatile DomainSocketAddress remote;
38
39 public EpollDomainSocketChannel() {
40 super(newSocketDomain(), false);
41 }
42
43 EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
44 this(parent, new LinuxSocket(fd.intValue()));
45 }
46
47 public EpollDomainSocketChannel(int fd) {
48 super(fd);
49 }
50
51 public EpollDomainSocketChannel(Channel parent, LinuxSocket fd) {
52 super(parent, fd);
53 local = fd.localDomainSocketAddress();
54 remote = fd.remoteDomainSocketAddress();
55 }
56
57 public EpollDomainSocketChannel(int fd, boolean active) {
58 super(new LinuxSocket(fd), active);
59 }
60
61 @Override
62 protected AbstractEpollUnsafe newUnsafe() {
63 return new EpollDomainUnsafe();
64 }
65
66 @Override
67 protected DomainSocketAddress localAddress0() {
68 return local;
69 }
70
71 @Override
72 protected DomainSocketAddress remoteAddress0() {
73 return remote;
74 }
75
76 @Override
77 protected void doBind(SocketAddress localAddress) throws Exception {
78 socket.bind(localAddress);
79 local = (DomainSocketAddress) localAddress;
80 }
81
82 @Override
83 public EpollDomainSocketChannelConfig config() {
84 return config;
85 }
86
87 @Override
88 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
89 if (super.doConnect(remoteAddress, localAddress)) {
90 local = localAddress != null ? (DomainSocketAddress) localAddress : socket.localDomainSocketAddress();
91 remote = (DomainSocketAddress) remoteAddress;
92 return true;
93 }
94 return false;
95 }
96
97 @Override
98 public DomainSocketAddress remoteAddress() {
99 return (DomainSocketAddress) super.remoteAddress();
100 }
101
102 @Override
103 public DomainSocketAddress localAddress() {
104 return (DomainSocketAddress) super.localAddress();
105 }
106
107 @Override
108 protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
109 Object msg = in.current();
110 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
111
112 in.remove();
113 return 1;
114 }
115 return super.doWriteSingle(in);
116 }
117
118 @Override
119 protected Object filterOutboundMessage(Object msg) {
120 if (msg instanceof FileDescriptor) {
121 return msg;
122 }
123 return super.filterOutboundMessage(msg);
124 }
125
126
127
128
129
130 @UnstableApi
131 public PeerCredentials peerCredentials() throws IOException {
132 return socket.getPeerCredentials();
133 }
134
135 private final class EpollDomainUnsafe extends EpollStreamUnsafe {
136 @Override
137 void epollInReady() {
138 switch (config().getReadMode()) {
139 case BYTES:
140 super.epollInReady();
141 break;
142 case FILE_DESCRIPTORS:
143 epollInReadFd();
144 break;
145 default:
146 throw new Error();
147 }
148 }
149
150 private void epollInReadFd() {
151 if (socket.isInputShutdown()) {
152 clearEpollIn0();
153 return;
154 }
155 final ChannelConfig config = config();
156 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
157 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
158
159 final ChannelPipeline pipeline = pipeline();
160 allocHandle.reset(config);
161 epollInBefore();
162
163 try {
164 readLoop: do {
165
166
167
168 allocHandle.lastBytesRead(socket.recvFd());
169 switch(allocHandle.lastBytesRead()) {
170 case 0:
171 break readLoop;
172 case -1:
173 close(voidPromise());
174 return;
175 default:
176 allocHandle.incMessagesRead(1);
177 readPending = false;
178 pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
179 break;
180 }
181 } while (allocHandle.continueReading());
182
183 allocHandle.readComplete();
184 pipeline.fireChannelReadComplete();
185 } catch (Throwable t) {
186 allocHandle.readComplete();
187 pipeline.fireChannelReadComplete();
188 pipeline.fireExceptionCaught(t);
189 } finally {
190 epollInFinally(config);
191 }
192 }
193 }
194 }