1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.handler.timeout;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelDuplexHandler;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.ChannelOutboundHandlerAdapter;
26 import io.netty.channel.ChannelPromise;
27
28 import java.util.concurrent.ScheduledFuture;
29 import java.util.concurrent.TimeUnit;
30
31 /**
32 * Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.
33 *
34 * <pre>
35 * // The connection is closed when a write operation cannot finish in 30 seconds.
36 *
37 * public class MyChannelInitializer extends {@link ChannelInitializer}<{@link Channel}> {
38 * public void initChannel({@link Channel} channel) {
39 * channel.pipeline().addLast("writeTimeoutHandler", new {@link WriteTimeoutHandler}(30);
40 * channel.pipeline().addLast("myHandler", new MyHandler());
41 * }
42 * }
43 *
44 * // Handler should handle the {@link WriteTimeoutException}.
45 * public class MyHandler extends {@link ChannelDuplexHandler} {
46 * {@code @Override}
47 * public void exceptionCaught({@link ChannelHandlerContext} ctx, {@link Throwable} cause)
48 * throws {@link Exception} {
49 * if (cause instanceof {@link WriteTimeoutException}) {
50 * // do something
51 * } else {
52 * super.exceptionCaught(ctx, cause);
53 * }
54 * }
55 * }
56 *
57 * {@link ServerBootstrap} bootstrap = ...;
58 * ...
59 * bootstrap.childHandler(new MyChannelInitializer());
60 * ...
61 * </pre>
62 * @see ReadTimeoutHandler
63 * @see IdleStateHandler
64 */
65 public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
66 private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
67
68 private final long timeoutNanos;
69
70 /**
71 * A doubly-linked list to track all WriteTimeoutTasks
72 */
73 private WriteTimeoutTask lastTask;
74
75 private boolean closed;
76
77 /**
78 * Creates a new instance.
79 *
80 * @param timeoutSeconds
81 * write timeout in seconds
82 */
83 public WriteTimeoutHandler(int timeoutSeconds) {
84 this(timeoutSeconds, TimeUnit.SECONDS);
85 }
86
87 /**
88 * Creates a new instance.
89 *
90 * @param timeout
91 * write timeout
92 * @param unit
93 * the {@link TimeUnit} of {@code timeout}
94 */
95 public WriteTimeoutHandler(long timeout, TimeUnit unit) {
96 if (unit == null) {
97 throw new NullPointerException("unit");
98 }
99
100 if (timeout <= 0) {
101 timeoutNanos = 0;
102 } else {
103 timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
104 }
105 }
106
107 @Override
108 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
109 scheduleTimeout(ctx, promise);
110 ctx.write(msg, promise);
111 }
112
113 @Override
114 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
115 WriteTimeoutTask task = lastTask;
116 lastTask = null;
117 while (task != null) {
118 task.scheduledFuture.cancel(false);
119 WriteTimeoutTask prev = task.prev;
120 task.prev = null;
121 task.next = null;
122 task = prev;
123 }
124 }
125
126 private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
127 // Schedule a timeout.
128 final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
129 task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
130
131 if (!task.scheduledFuture.isDone()) {
132 addWriteTimeoutTask(task);
133
134 // Cancel the scheduled timeout if the flush promise is complete.
135 promise.addListener(task);
136 }
137 }
138
139 private void addWriteTimeoutTask(WriteTimeoutTask task) {
140 if (lastTask == null) {
141 lastTask = task;
142 } else {
143 lastTask.next = task;
144 task.prev = lastTask;
145 lastTask = task;
146 }
147 }
148
149 private void removeWriteTimeoutTask(WriteTimeoutTask task) {
150 if (task == lastTask) {
151 // task is the tail of list
152 assert task.next == null;
153 lastTask = lastTask.prev;
154 if (lastTask != null) {
155 lastTask.next = null;
156 }
157 } else if (task.prev == null && task.next == null) {
158 // Since task is not lastTask, then it has been removed or not been added.
159 return;
160 } else if (task.prev == null) {
161 // task is the head of list and the list has at least 2 nodes
162 task.next.prev = null;
163 } else {
164 task.prev.next = task.next;
165 task.next.prev = task.prev;
166 }
167 task.prev = null;
168 task.next = null;
169 }
170
171 /**
172 * Is called when a write timeout was detected
173 */
174 protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
175 if (!closed) {
176 ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
177 ctx.close();
178 closed = true;
179 }
180 }
181
182 private final class WriteTimeoutTask implements Runnable, ChannelFutureListener {
183
184 private final ChannelHandlerContext ctx;
185 private final ChannelPromise promise;
186
187 // WriteTimeoutTask is also a node of a doubly-linked list
188 WriteTimeoutTask prev;
189 WriteTimeoutTask next;
190
191 ScheduledFuture<?> scheduledFuture;
192
193 WriteTimeoutTask(ChannelHandlerContext ctx, ChannelPromise promise) {
194 this.ctx = ctx;
195 this.promise = promise;
196 }
197
198 @Override
199 public void run() {
200 // Was not written yet so issue a write timeout
201 // The promise itself will be failed with a ClosedChannelException once the close() was issued
202 // See https://github.com/netty/netty/issues/2159
203 if (!promise.isDone()) {
204 try {
205 writeTimedOut(ctx);
206 } catch (Throwable t) {
207 ctx.fireExceptionCaught(t);
208 }
209 }
210 removeWriteTimeoutTask(this);
211 }
212
213 @Override
214 public void operationComplete(ChannelFuture future) throws Exception {
215 // scheduledFuture has already be set when reaching here
216 scheduledFuture.cancel(false);
217 removeWriteTimeoutTask(this);
218 }
219 }
220 }