查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.ChannelConfig;
20  import io.netty.channel.ChannelOutboundBuffer;
21  import io.netty.channel.ChannelPipeline;
22  import io.netty.channel.unix.DomainSocketAddress;
23  import io.netty.channel.unix.DomainSocketChannel;
24  import io.netty.channel.unix.FileDescriptor;
25  import io.netty.channel.unix.PeerCredentials;
26  import io.netty.util.internal.UnstableApi;
27  
28  import java.io.IOException;
29  import java.net.SocketAddress;
30  
31  import static io.netty.channel.kqueue.BsdSocket.newSocketDomain;
32  
33  @UnstableApi
34  public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel implements DomainSocketChannel {
35      private final KQueueDomainSocketChannelConfig config = new KQueueDomainSocketChannelConfig(this);
36  
37      private volatile DomainSocketAddress local;
38      private volatile DomainSocketAddress remote;
39  
40      public KQueueDomainSocketChannel() {
41          super(null, newSocketDomain(), false);
42      }
43  
44      public KQueueDomainSocketChannel(int fd) {
45          this(null, new BsdSocket(fd));
46      }
47  
48      KQueueDomainSocketChannel(Channel parent, BsdSocket fd) {
49          super(parent, fd, true);
50          local = fd.localDomainSocketAddress();
51          remote = fd.remoteDomainSocketAddress();
52      }
53  
54      @Override
55      protected AbstractKQueueUnsafe newUnsafe() {
56          return new KQueueDomainUnsafe();
57      }
58  
59      @Override
60      protected DomainSocketAddress localAddress0() {
61          return local;
62      }
63  
64      @Override
65      protected DomainSocketAddress remoteAddress0() {
66          return remote;
67      }
68  
69      @Override
70      protected void doBind(SocketAddress localAddress) throws Exception {
71          socket.bind(localAddress);
72          local = (DomainSocketAddress) localAddress;
73      }
74  
75      @Override
76      public KQueueDomainSocketChannelConfig config() {
77          return config;
78      }
79  
80      @Override
81      protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
82          if (super.doConnect(remoteAddress, localAddress)) {
83              local = localAddress != null ? (DomainSocketAddress) localAddress : socket.localDomainSocketAddress();
84              remote = (DomainSocketAddress) remoteAddress;
85              return true;
86          }
87          return false;
88      }
89  
90      @Override
91      public DomainSocketAddress remoteAddress() {
92          return (DomainSocketAddress) super.remoteAddress();
93      }
94  
95      @Override
96      public DomainSocketAddress localAddress() {
97          return (DomainSocketAddress) super.localAddress();
98      }
99  
100     @Override
101     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
102         Object msg = in.current();
103         if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
104             // File descriptor was written, so remove it.
105             in.remove();
106             return 1;
107         }
108         return super.doWriteSingle(in);
109     }
110 
111     @Override
112     protected Object filterOutboundMessage(Object msg) {
113         if (msg instanceof FileDescriptor) {
114             return msg;
115         }
116         return super.filterOutboundMessage(msg);
117     }
118 
119     /**
120      * Returns the unix credentials (uid, gid, pid) of the peer
121      * <a href=https://man7.org/linux/man-pages/man7/socket.7.html>SO_PEERCRED</a>
122      */
123     @UnstableApi
124     public PeerCredentials peerCredentials() throws IOException {
125         return socket.getPeerCredentials();
126     }
127 
128     private final class KQueueDomainUnsafe extends KQueueStreamUnsafe {
129         @Override
130         void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
131             switch (config().getReadMode()) {
132                 case BYTES:
133                     super.readReady(allocHandle);
134                     break;
135                 case FILE_DESCRIPTORS:
136                     readReadyFd();
137                     break;
138                 default:
139                     throw new Error();
140             }
141         }
142 
143         private void readReadyFd() {
144             if (socket.isInputShutdown()) {
145                 super.clearReadFilter0();
146                 return;
147             }
148             final ChannelConfig config = config();
149             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
150 
151             final ChannelPipeline pipeline = pipeline();
152             allocHandle.reset(config);
153             readReadyBefore();
154 
155             try {
156                 readLoop: do {
157                     // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
158                     // KQueueRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
159                     // enabled.
160                     int recvFd = socket.recvFd();
161                     switch(recvFd) {
162                         case 0:
163                             allocHandle.lastBytesRead(0);
164                             break readLoop;
165                         case -1:
166                             allocHandle.lastBytesRead(-1);
167                             close(voidPromise());
168                             return;
169                         default:
170                             allocHandle.lastBytesRead(1);
171                             allocHandle.incMessagesRead(1);
172                             readPending = false;
173                             pipeline.fireChannelRead(new FileDescriptor(recvFd));
174                             break;
175                     }
176                 } while (allocHandle.continueReading());
177 
178                 allocHandle.readComplete();
179                 pipeline.fireChannelReadComplete();
180             } catch (Throwable t) {
181                 allocHandle.readComplete();
182                 pipeline.fireChannelReadComplete();
183                 pipeline.fireExceptionCaught(t);
184             } finally {
185                 readReadyFinally(config);
186             }
187         }
188     }
189 }