1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AbstractChannel;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelConfig;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPipeline;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.DefaultChannelConfig;
27 import io.netty.channel.EventLoop;
28 import io.netty.channel.PreferHeapByteBufAllocator;
29 import io.netty.channel.RecvByteBufAllocator;
30 import io.netty.channel.SingleThreadEventLoop;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.concurrent.Future;
33 import io.netty.util.concurrent.SingleThreadEventExecutor;
34 import io.netty.util.internal.InternalThreadLocalMap;
35 import io.netty.util.internal.PlatformDependent;
36 import io.netty.util.internal.logging.InternalLogger;
37 import io.netty.util.internal.logging.InternalLoggerFactory;
38
39 import java.net.ConnectException;
40 import java.net.SocketAddress;
41 import java.nio.channels.AlreadyConnectedException;
42 import java.nio.channels.ClosedChannelException;
43 import java.nio.channels.ConnectionPendingException;
44 import java.nio.channels.NotYetConnectedException;
45 import java.util.Queue;
46 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
47
48
49
50
51 public class LocalChannel extends AbstractChannel {
52 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannel.class);
53 @SuppressWarnings({ "rawtypes" })
54 private static final AtomicReferenceFieldUpdater<LocalChannel, Future> FINISH_READ_FUTURE_UPDATER =
55 AtomicReferenceFieldUpdater.newUpdater(LocalChannel.class, Future.class, "finishReadFuture");
56 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57 private static final int MAX_READER_STACK_DEPTH = 8;
58
59 private enum State { OPEN, BOUND, CONNECTED, CLOSED }
60
61 private final ChannelConfig config = new DefaultChannelConfig(this);
62
63 final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
64 private final Runnable readTask = new Runnable() {
65 @Override
66 public void run() {
67
68 if (!inboundBuffer.isEmpty()) {
69 readInbound();
70 }
71 }
72 };
73
74 private final Runnable shutdownHook = new Runnable() {
75 @Override
76 public void run() {
77 unsafe().close(unsafe().voidPromise());
78 }
79 };
80
81 private volatile State state;
82 private volatile LocalChannel peer;
83 private volatile LocalAddress localAddress;
84 private volatile LocalAddress remoteAddress;
85 private volatile ChannelPromise connectPromise;
86 private volatile boolean readInProgress;
87 private volatile boolean writeInProgress;
88 private volatile Future<?> finishReadFuture;
89
90 public LocalChannel() {
91 super(null);
92 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
93 }
94
95 protected LocalChannel(LocalServerChannel parent, LocalChannel peer) {
96 super(parent);
97 config().setAllocator(new PreferHeapByteBufAllocator(config.getAllocator()));
98 this.peer = peer;
99 localAddress = parent.localAddress();
100 remoteAddress = peer.localAddress();
101 }
102
103 @Override
104 public ChannelMetadata metadata() {
105 return METADATA;
106 }
107
108 @Override
109 public ChannelConfig config() {
110 return config;
111 }
112
113 @Override
114 public LocalServerChannel parent() {
115 return (LocalServerChannel) super.parent();
116 }
117
118 @Override
119 public LocalAddress localAddress() {
120 return (LocalAddress) super.localAddress();
121 }
122
123 @Override
124 public LocalAddress remoteAddress() {
125 return (LocalAddress) super.remoteAddress();
126 }
127
128 @Override
129 public boolean isOpen() {
130 return state != State.CLOSED;
131 }
132
133 @Override
134 public boolean isActive() {
135 return state == State.CONNECTED;
136 }
137
138 @Override
139 protected AbstractUnsafe newUnsafe() {
140 return new LocalUnsafe();
141 }
142
143 @Override
144 protected boolean isCompatible(EventLoop loop) {
145 return loop instanceof SingleThreadEventLoop;
146 }
147
148 @Override
149 protected SocketAddress localAddress0() {
150 return localAddress;
151 }
152
153 @Override
154 protected SocketAddress remoteAddress0() {
155 return remoteAddress;
156 }
157
158 @Override
159 protected void doRegister() throws Exception {
160
161
162
163
164
165 if (peer != null && parent() != null) {
166
167
168 final LocalChannel peer = this.peer;
169 state = State.CONNECTED;
170
171 peer.remoteAddress = parent() == null ? null : parent().localAddress();
172 peer.state = State.CONNECTED;
173
174
175
176
177
178 peer.eventLoop().execute(new Runnable() {
179 @Override
180 public void run() {
181 ChannelPromise promise = peer.connectPromise;
182
183
184
185 if (promise != null && promise.trySuccess()) {
186 peer.pipeline().fireChannelActive();
187 }
188 }
189 });
190 }
191 ((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
192 }
193
194 @Override
195 protected void doBind(SocketAddress localAddress) throws Exception {
196 this.localAddress =
197 LocalChannelRegistry.register(this, this.localAddress,
198 localAddress);
199 state = State.BOUND;
200 }
201
202 @Override
203 protected void doDisconnect() throws Exception {
204 doClose();
205 }
206
207 @Override
208 protected void doClose() throws Exception {
209 final LocalChannel peer = this.peer;
210 State oldState = state;
211 try {
212 if (oldState != State.CLOSED) {
213
214 if (localAddress != null) {
215 if (parent() == null) {
216 LocalChannelRegistry.unregister(localAddress);
217 }
218 localAddress = null;
219 }
220
221
222
223 state = State.CLOSED;
224
225
226 if (writeInProgress && peer != null) {
227 finishPeerRead(peer);
228 }
229
230 ChannelPromise promise = connectPromise;
231 if (promise != null) {
232
233 promise.tryFailure(new ClosedChannelException());
234 connectPromise = null;
235 }
236 }
237
238 if (peer != null) {
239 this.peer = null;
240
241
242
243 EventLoop peerEventLoop = peer.eventLoop();
244 final boolean peerIsActive = peer.isActive();
245 try {
246 peerEventLoop.execute(new Runnable() {
247 @Override
248 public void run() {
249 peer.tryClose(peerIsActive);
250 }
251 });
252 } catch (Throwable cause) {
253 logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
254 this, peer, cause);
255 if (peerEventLoop.inEventLoop()) {
256 peer.releaseInboundBuffers();
257 } else {
258
259
260 peer.close();
261 }
262 PlatformDependent.throwException(cause);
263 }
264 }
265 } finally {
266
267 if (oldState != null && oldState != State.CLOSED) {
268
269
270
271
272 releaseInboundBuffers();
273 }
274 }
275 }
276
277 private void tryClose(boolean isActive) {
278 if (isActive) {
279 unsafe().close(unsafe().voidPromise());
280 } else {
281 releaseInboundBuffers();
282 }
283 }
284
285 @Override
286 protected void doDeregister() throws Exception {
287
288 ((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
289 }
290
291 private void readInbound() {
292 RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
293 handle.reset(config());
294 ChannelPipeline pipeline = pipeline();
295 do {
296 Object received = inboundBuffer.poll();
297 if (received == null) {
298 break;
299 }
300 if (received instanceof ByteBuf && inboundBuffer.peek() instanceof ByteBuf) {
301 ByteBuf msg = (ByteBuf) received;
302 ByteBuf output = handle.allocate(alloc());
303 if (msg.readableBytes() < output.writableBytes()) {
304
305 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
306 msg.release();
307 while ((received = inboundBuffer.peek()) instanceof ByteBuf &&
308 ((ByteBuf) received).readableBytes() < output.writableBytes()) {
309 inboundBuffer.poll();
310 msg = (ByteBuf) received;
311 output.writeBytes(msg, msg.readerIndex(), msg.readableBytes());
312 msg.release();
313 }
314 handle.lastBytesRead(output.readableBytes());
315 received = output;
316 } else {
317
318 handle.lastBytesRead(output.capacity());
319 output.release();
320 }
321 }
322 handle.incMessagesRead(1);
323 pipeline.fireChannelRead(received);
324 } while (handle.continueReading());
325 handle.readComplete();
326 pipeline.fireChannelReadComplete();
327 }
328
329 @Override
330 protected void doBeginRead() throws Exception {
331 if (readInProgress) {
332 return;
333 }
334
335 Queue<Object> inboundBuffer = this.inboundBuffer;
336 if (inboundBuffer.isEmpty()) {
337 readInProgress = true;
338 return;
339 }
340
341 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
342 final int stackDepth = threadLocals.localChannelReaderStackDepth();
343 if (stackDepth < MAX_READER_STACK_DEPTH) {
344 threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
345 try {
346 readInbound();
347 } finally {
348 threadLocals.setLocalChannelReaderStackDepth(stackDepth);
349 }
350 } else {
351 try {
352 eventLoop().execute(readTask);
353 } catch (Throwable cause) {
354 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
355 close();
356 peer.close();
357 PlatformDependent.throwException(cause);
358 }
359 }
360 }
361
362 @Override
363 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
364 switch (state) {
365 case OPEN:
366 case BOUND:
367 throw new NotYetConnectedException();
368 case CLOSED:
369 throw new ClosedChannelException();
370 case CONNECTED:
371 break;
372 }
373
374 final LocalChannel peer = this.peer;
375
376 writeInProgress = true;
377 try {
378 ClosedChannelException exception = null;
379 for (;;) {
380 Object msg = in.current();
381 if (msg == null) {
382 break;
383 }
384 try {
385
386
387 if (peer.state == State.CONNECTED) {
388 peer.inboundBuffer.add(ReferenceCountUtil.retain(msg));
389 in.remove();
390 } else {
391 if (exception == null) {
392 exception = new ClosedChannelException();
393 }
394 in.remove(exception);
395 }
396 } catch (Throwable cause) {
397 in.remove(cause);
398 }
399 }
400 } finally {
401
402
403
404
405
406 writeInProgress = false;
407 }
408
409 finishPeerRead(peer);
410 }
411
412 private void finishPeerRead(final LocalChannel peer) {
413
414 if (peer.eventLoop() == eventLoop() && !peer.writeInProgress) {
415 finishPeerRead0(peer);
416 } else {
417 runFinishPeerReadTask(peer);
418 }
419 }
420
421 private void runFinishPeerReadTask(final LocalChannel peer) {
422
423
424 final Runnable finishPeerReadTask = new Runnable() {
425 @Override
426 public void run() {
427 finishPeerRead0(peer);
428 }
429 };
430 try {
431 if (peer.writeInProgress) {
432 peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
433 } else {
434 peer.eventLoop().execute(finishPeerReadTask);
435 }
436 } catch (Throwable cause) {
437 logger.warn("Closing Local channels {}-{} because exception occurred!", this, peer, cause);
438 close();
439 peer.close();
440 PlatformDependent.throwException(cause);
441 }
442 }
443
444 private void releaseInboundBuffers() {
445 assert eventLoop() == null || eventLoop().inEventLoop();
446 readInProgress = false;
447 Queue<Object> inboundBuffer = this.inboundBuffer;
448 Object msg;
449 while ((msg = inboundBuffer.poll()) != null) {
450 ReferenceCountUtil.release(msg);
451 }
452 }
453
454 private void finishPeerRead0(LocalChannel peer) {
455 Future<?> peerFinishReadFuture = peer.finishReadFuture;
456 if (peerFinishReadFuture != null) {
457 if (!peerFinishReadFuture.isDone()) {
458 runFinishPeerReadTask(peer);
459 return;
460 } else {
461
462 FINISH_READ_FUTURE_UPDATER.compareAndSet(peer, peerFinishReadFuture, null);
463 }
464 }
465
466
467 if (peer.readInProgress && !peer.inboundBuffer.isEmpty()) {
468 peer.readInProgress = false;
469 peer.readInbound();
470 }
471 }
472
473 private class LocalUnsafe extends AbstractUnsafe {
474
475 @Override
476 public void connect(final SocketAddress remoteAddress,
477 SocketAddress localAddress, final ChannelPromise promise) {
478 if (!promise.setUncancellable() || !ensureOpen(promise)) {
479 return;
480 }
481
482 if (state == State.CONNECTED) {
483 Exception cause = new AlreadyConnectedException();
484 safeSetFailure(promise, cause);
485 pipeline().fireExceptionCaught(cause);
486 return;
487 }
488
489 if (connectPromise != null) {
490 throw new ConnectionPendingException();
491 }
492
493 connectPromise = promise;
494
495 if (state != State.BOUND) {
496
497 if (localAddress == null) {
498 localAddress = new LocalAddress(LocalChannel.this);
499 }
500 }
501
502 if (localAddress != null) {
503 try {
504 doBind(localAddress);
505 } catch (Throwable t) {
506 safeSetFailure(promise, t);
507 close(voidPromise());
508 return;
509 }
510 }
511
512 Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
513 if (!(boundChannel instanceof LocalServerChannel)) {
514 Exception cause = new ConnectException("connection refused: " + remoteAddress);
515 safeSetFailure(promise, cause);
516 close(voidPromise());
517 return;
518 }
519
520 LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
521 peer = serverChannel.serve(LocalChannel.this);
522 }
523 }
524 }