1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelHandlerContext;
22 import io.netty.channel.ChannelOption;
23 import io.netty.channel.SimpleChannelInboundHandler;
24 import io.netty.channel.socket.ChannelInputShutdownEvent;
25 import io.netty.channel.socket.DuplexChannel;
26 import org.junit.jupiter.api.Test;
27 import org.junit.jupiter.api.TestInfo;
28 import org.junit.jupiter.api.Timeout;
29
30 import java.io.IOException;
31 import java.net.SocketAddress;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import static org.junit.jupiter.api.Assertions.assertEquals;
39 import static org.junit.jupiter.api.Assertions.assertFalse;
40 import static org.junit.jupiter.api.Assertions.assertTrue;
41
42 public abstract class AbstractSocketShutdownOutputByPeerTest<Socket> extends AbstractServerSocketTest {
43
44 @Test
45 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
46 public void testShutdownOutput(TestInfo testInfo) throws Throwable {
47 run(testInfo, new Runner<ServerBootstrap>() {
48 @Override
49 public void run(ServerBootstrap serverBootstrap) throws Throwable {
50 testShutdownOutput(serverBootstrap);
51 }
52 });
53 }
54
55 public void testShutdownOutput(ServerBootstrap sb) throws Throwable {
56 TestHandler h = new TestHandler();
57 Socket s = newSocket();
58 Channel sc = null;
59 try {
60 sc = sb.childHandler(h).childOption(ChannelOption.ALLOW_HALF_CLOSURE, true).bind().sync().channel();
61
62 connect(s, sc.localAddress());
63 write(s, 1);
64
65 assertEquals(1, (int) h.queue.take());
66
67 assertTrue(h.ch.isOpen());
68 assertTrue(h.ch.isActive());
69 assertFalse(h.ch.isInputShutdown());
70 assertFalse(h.ch.isOutputShutdown());
71
72 shutdownOutput(s);
73
74 h.halfClosure.await();
75
76 assertTrue(h.ch.isOpen());
77 assertTrue(h.ch.isActive());
78 assertTrue(h.ch.isInputShutdown());
79 assertFalse(h.ch.isOutputShutdown());
80
81 while (h.closure.getCount() != 1 && h.halfClosureCount.intValue() != 1) {
82 Thread.sleep(100);
83 }
84 } finally {
85 if (sc != null) {
86 sc.close();
87 }
88 close(s);
89 }
90 }
91
92 @Test
93 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
94 public void testShutdownOutputWithoutOption(TestInfo testInfo) throws Throwable {
95 run(testInfo, new Runner<ServerBootstrap>() {
96 @Override
97 public void run(ServerBootstrap serverBootstrap) throws Throwable {
98 testShutdownOutputWithoutOption(serverBootstrap);
99 }
100 });
101 }
102
103 public void testShutdownOutputWithoutOption(ServerBootstrap sb) throws Throwable {
104 TestHandler h = new TestHandler();
105 Socket s = newSocket();
106 Channel sc = null;
107 try {
108 sc = sb.childHandler(h).bind().sync().channel();
109
110 connect(s, sc.localAddress());
111 write(s, 1);
112
113 assertEquals(1, (int) h.queue.take());
114
115 assertTrue(h.ch.isOpen());
116 assertTrue(h.ch.isActive());
117 assertFalse(h.ch.isInputShutdown());
118 assertFalse(h.ch.isOutputShutdown());
119
120 shutdownOutput(s);
121
122 h.closure.await();
123
124 assertFalse(h.ch.isOpen());
125 assertFalse(h.ch.isActive());
126 assertTrue(h.ch.isInputShutdown());
127 assertTrue(h.ch.isOutputShutdown());
128
129 while (h.halfClosure.getCount() != 1 && h.halfClosureCount.intValue() != 0) {
130 Thread.sleep(100);
131 }
132 } finally {
133 if (sc != null) {
134 sc.close();
135 }
136 close(s);
137 }
138 }
139
140 protected abstract void shutdownOutput(Socket s) throws IOException;
141
142 protected abstract void connect(Socket s, SocketAddress address) throws IOException;
143
144 protected abstract void close(Socket s) throws IOException;
145
146 protected abstract void write(Socket s, int data) throws IOException;
147
148 protected abstract Socket newSocket();
149
150 private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
151 volatile DuplexChannel ch;
152 final BlockingQueue<Byte> queue = new LinkedBlockingQueue<Byte>();
153 final CountDownLatch halfClosure = new CountDownLatch(1);
154 final CountDownLatch closure = new CountDownLatch(1);
155 final AtomicInteger halfClosureCount = new AtomicInteger();
156
157 @Override
158 public void channelActive(ChannelHandlerContext ctx) throws Exception {
159 ch = (DuplexChannel) ctx.channel();
160 }
161
162 @Override
163 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
164 closure.countDown();
165 }
166
167 @Override
168 public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
169 queue.offer(msg.readByte());
170 }
171
172 @Override
173 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
174 if (evt instanceof ChannelInputShutdownEvent) {
175 halfClosureCount.incrementAndGet();
176 halfClosure.countDown();
177 }
178 }
179 }
180 }