1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.AbstractServerChannel;
19 import io.netty.channel.ChannelConfig;
20 import io.netty.channel.ChannelPipeline;
21 import io.netty.channel.DefaultChannelConfig;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.PreferHeapByteBufAllocator;
24 import io.netty.channel.RecvByteBufAllocator;
25 import io.netty.channel.ServerChannel;
26 import io.netty.channel.ServerChannelRecvByteBufAllocator;
27 import io.netty.channel.SingleThreadEventLoop;
28 import io.netty.util.concurrent.SingleThreadEventExecutor;
29
30 import java.net.SocketAddress;
31 import java.util.ArrayDeque;
32 import java.util.Queue;
33
34
35
36
37 public class LocalServerChannel extends AbstractServerChannel {
38
39 private final ChannelConfig config =
40 new DefaultChannelConfig(this, new ServerChannelRecvByteBufAllocator()) { };
41 private final Queue<Object> inboundBuffer = new ArrayDeque<Object>();
42 private final Runnable shutdownHook = new Runnable() {
43 @Override
44 public void run() {
45 unsafe().close(unsafe().voidPromise());
46 }
47 };
48
49 private volatile int state;
50 private volatile LocalAddress localAddress;
51 private volatile boolean acceptInProgress;
52
53 public LocalServerChannel() {
54 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
55 }
56
57 @Override
58 public ChannelConfig config() {
59 return config;
60 }
61
62 @Override
63 public LocalAddress localAddress() {
64 return (LocalAddress) super.localAddress();
65 }
66
67 @Override
68 public LocalAddress remoteAddress() {
69 return (LocalAddress) super.remoteAddress();
70 }
71
72 @Override
73 public boolean isOpen() {
74 return state < 2;
75 }
76
77 @Override
78 public boolean isActive() {
79 return state == 1;
80 }
81
82 @Override
83 protected boolean isCompatible(EventLoop loop) {
84 return loop instanceof SingleThreadEventLoop;
85 }
86
87 @Override
88 protected SocketAddress localAddress0() {
89 return localAddress;
90 }
91
92 @Override
93 protected void doRegister() throws Exception {
94 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
95 }
96
97 @Override
98 protected void doBind(SocketAddress localAddress) throws Exception {
99 this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
100 state = 1;
101 }
102
103 @Override
104 protected void doClose() throws Exception {
105 if (state <= 1) {
106
107 if (localAddress != null) {
108 LocalChannelRegistry.unregister(localAddress);
109 localAddress = null;
110 }
111 state = 2;
112 }
113 }
114
115 @Override
116 protected void doDeregister() throws Exception {
117 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
118 }
119
120 @Override
121 protected void doBeginRead() throws Exception {
122 if (acceptInProgress) {
123 return;
124 }
125
126 Queue<Object> inboundBuffer = this.inboundBuffer;
127 if (inboundBuffer.isEmpty()) {
128 acceptInProgress = true;
129 return;
130 }
131
132 readInbound();
133 }
134
135 LocalChannel serve(final LocalChannel peer) {
136 final LocalChannel child = newLocalChannel(peer);
137 if (eventLoop().inEventLoop()) {
138 serve0(child);
139 } else {
140 eventLoop().execute(new Runnable() {
141 @Override
142 public void run() {
143 serve0(child);
144 }
145 });
146 }
147 return child;
148 }
149
150 private void readInbound() {
151 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
152 handle.reset(config());
153 ChannelPipeline pipeline = pipeline();
154 do {
155 Object m = inboundBuffer.poll();
156 if (m == null) {
157 break;
158 }
159 pipeline.fireChannelRead(m);
160 } while (handle.continueReading());
161 handle.readComplete();
162 pipeline.fireChannelReadComplete();
163 }
164
165
166
167
168
169 protected LocalChannel newLocalChannel(LocalChannel peer) {
170 return new LocalChannel(this, peer);
171 }
172
173 private void serve0(final LocalChannel child) {
174 inboundBuffer.add(child);
175 if (acceptInProgress) {
176 acceptInProgress = false;
177
178 readInbound();
179 }
180 }
181 }