1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelDuplexHandler;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.ChannelOutboundHandlerAdapter;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.util.concurrent.Future;
28 import io.netty.util.internal.ObjectUtil;
29
30 import java.util.concurrent.TimeUnit;
31
32 /**
33 * Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
34 *
35 * <pre>
36 * // The connection is closed when a write operation cannot finish in 30 seconds.
37 *
38 * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
39 * public void initChannel({@link Channel} channel) {
40 * channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
41 * channel.pipeline().addLast("myHandler", new MyHandler());
42 * }
43 * }
44 *
45 * // Handler should handle the {@link WriteTimeoutException}.
46 * public class MyHandler extends {@link ChannelDuplexHandler} {
47 * {@code @Override}
48 * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
49 * throws {@link Exception} {
50 * if (cause instanceof {@link WriteTimeoutException}) {
51 * // do something
52 * } else {
53 * super.exceptionCaught(ctx, cause);
54 * }
55 * }
56 * }
57 *
58 * {@link ServerBootstrap} bootstrap = ...;
59 * ...
60 * bootstrap.childHandler(new MyChannelInitializer());
61 * ...
62 * </pre>
63 * @see ReadTimeoutHandler
64 * @see IdleStateHandler
65 */
66 public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
67 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
68
69 private final long timeoutNanos;
70
71 /**
72 * A doubly-linked list to track all WriteTimeoutTasks
73 */
74 private WriteTimeoutTask lastTask;
75
76 private boolean closed;
77
78 /**
79 * Creates a new instance.
80 *
81 * @param timeoutSeconds
82 * write timeout in seconds
83 */
84 public WriteTimeoutHandler(int timeoutSeconds) {
85 this(timeoutSeconds, TimeUnit.SECONDS);
86 }
87
88 /**
89 * Creates a new instance.
90 *
91 * @param timeout
92 * write timeout
93 * @param unit
94 * the {@link TimeUnit} of {@code timeout}
95 */
96 public WriteTimeoutHandler(long timeout, TimeUnit unit) {
97 ObjectUtil.checkNotNull(unit, "unit");
98
99 if (timeout <= 0) {
100 timeoutNanos = 0;
101 } else {
102 timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
103 }
104 }
105
106 @Override
107 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
108 if (timeoutNanos > 0) {
109 promise = promise.unvoid();
110 scheduleTimeout(ctx, promise);
111 }
112 ctx.write(msg, promise);
113 }
114
115 @Override
116 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
117 assert ctx.executor().inEventLoop();
118 WriteTimeoutTask task = lastTask;
119 lastTask = null;
120 while (task != null) {
121 assert task.ctx.executor().inEventLoop();
122 task.scheduledFuture.cancel(false);
123 WriteTimeoutTask prev = task.prev;
124 task.prev = null;
125 task.next = null;
126 task = prev;
127 }
128 }
129
130 private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
131 // Schedule a timeout.
132 final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
133 task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
134
135 if (!task.scheduledFuture.isDone()) {
136 addWriteTimeoutTask(task);
137
138 // Cancel the scheduled timeout if the flush promise is complete.
139 promise.addListener(task);
140 }
141 }
142
143 private void addWriteTimeoutTask(WriteTimeoutTask task) {
144 assert task.ctx.executor().inEventLoop();
145 if (lastTask != null) {
146 lastTask.next = task;
147 task.prev = lastTask;
148 }
149 lastTask = task;
150 }
151
152 private void removeWriteTimeoutTask(WriteTimeoutTask task) {
153 assert task.ctx.executor().inEventLoop();
154 if (task == lastTask) {
155 // task is the tail of list
156 assert task.next == null;
157 lastTask = lastTask.prev;
158 if (lastTask != null) {
159 lastTask.next = null;
160 }
161 } else if (task.prev == null && task.next == null) {
162 // Since task is not lastTask, then it has been removed or not been added.
163 return;
164 } else if (task.prev == null) {
165 // task is the head of list and the list has at least 2 nodes
166 task.next.prev = null;
167 } else {
168 task.prev.next = task.next;
169 task.next.prev = task.prev;
170 }
171 task.prev = null;
172 task.next = null;
173 }
174
175 /**
176 * Is called when a write timeout was detected
177 */
178 protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
179 if (!closed) {
180 ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
181 ctx.close();
182 closed = true;
183 }
184 }
185
186 private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
187
188 private final ChannelHandlerContext ctx;
189 private final ChannelPromise promise;
190
191 // WriteTimeoutTask is also a node of a doubly-linked list
192 WriteTimeoutTask prev;
193 WriteTimeoutTask next;
194
195 Future<?> scheduledFuture;
196
197 WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
198 this.ctx = ctx;
199 this.promise = promise;
200 }
201
202 @Override
203 public void run() {
204 // Was not written yet so issue a write timeout
205 // The promise itself will be failed with a ClosedChannelException once the close() was issued
206 // See https://github.com/netty/netty/issues/2159
207 if (!promise.isDone()) {
208 try {
209 writeTimedOut(ctx);
210 } catch (Throwable t) {
211 ctx.fireExceptionCaught(t);
212 }
213 }
214 removeWriteTimeoutTask(this);
215 }
216
217 @Override
218 public void operationComplete(ChannelFuture future) throws Exception {
219 // scheduledFuture has already be set when reaching here
220 scheduledFuture.cancel(false);
221
222 // Check if its safe to modify the "doubly-linked-list" that we maintain. If its not we will schedule the
223 // modification so its picked up by the executor..
224 if (ctx.executor().inEventLoop()) {
225 removeWriteTimeoutTask(this);
226 } else {
227 // So let's just pass outself to the executor which will then take care of remove this task
228 // from the doubly-linked list. Schedule ourself is fine as the promise itself is done.
229 //
230 // This fixes https://github.com/netty/netty/issues/11053
231 assert promise.isDone();
232 ctx.executor().execute(this);
233 }
234 }
235 }
236 }