查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.timeout;
17  
18  import org.jboss.netty.bootstrap.ServerBootstrap;
19  import org.jboss.netty.channel.ChannelHandler;
20  import org.jboss.netty.channel.ChannelHandler.Sharable;
21  import org.jboss.netty.channel.ChannelHandlerContext;
22  import org.jboss.netty.channel.ChannelPipeline;
23  import org.jboss.netty.channel.ChannelPipelineFactory;
24  import org.jboss.netty.channel.ChannelStateEvent;
25  import org.jboss.netty.channel.Channels;
26  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
27  import org.jboss.netty.channel.MessageEvent;
28  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
29  import org.jboss.netty.util.ExternalResourceReleasable;
30  import org.jboss.netty.util.HashedWheelTimer;
31  import org.jboss.netty.util.Timeout;
32  import org.jboss.netty.util.Timer;
33  import org.jboss.netty.util.TimerTask;
34  
35  import java.util.concurrent.TimeUnit;
36  
37  import static org.jboss.netty.channel.Channels.*;
38  
39  /**
40   * Raises a {@link ReadTimeoutException} when no data was read within a certain
41   * period of time.
42   *
43   * <pre>
44   * public class MyPipelineFactory implements {@link ChannelPipelineFactory} {
45   *
46   *     private final {@link Timer} timer;
47   *     private final {@link ChannelHandler} timeoutHandler;
48   *
49   *     public MyPipelineFactory({@link Timer} timer) {
50   *         this.timer = timer;
51   *         this.timeoutHandler = <b>new {@link ReadTimeoutHandler}(timer, 30), // timer must be shared.</b>
52   *     }
53   *
54   *     public {@link ChannelPipeline} getPipeline() {
55   *         // An example configuration that implements 30-second read timeout:
56   *         return {@link Channels}.pipeline(
57   *             timeoutHandler,
58   *             new MyHandler());
59   *     }
60   * }
61   *
62   * {@link ServerBootstrap} bootstrap = ...;
63   * {@link Timer} timer = new {@link HashedWheelTimer}();
64   * ...
65   * bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
66   * ...
67   * </pre>
68   *
69   * The {@link Timer} which was specified when the {@link ReadTimeoutHandler} is
70   * created should be stopped manually by calling {@link #releaseExternalResources()}
71   * or {@link Timer#stop()} when your application shuts down.
72   * @see WriteTimeoutHandler
73   * @see IdleStateHandler
74   *
75   * @apiviz.landmark
76   * @apiviz.uses org.jboss.netty.util.HashedWheelTimer
77   * @apiviz.has org.jboss.netty.handler.timeout.TimeoutException oneway - - raises
78   */
79  @Sharable
80  public class ReadTimeoutHandler extends SimpleChannelUpstreamHandler
81                                  implements LifeCycleAwareChannelHandler,
82                                             ExternalResourceReleasable {
83  
84      static final ReadTimeoutException EXCEPTION = new ReadTimeoutException();
85  
86      final Timer timer;
87      final long timeoutMillis;
88  
89      /**
90       * Creates a new instance.
91       *
92       * @param timer
93       *        the {@link Timer} that is used to trigger the scheduled event.
94       *        The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
95       * @param timeoutSeconds
96       *        read timeout in seconds
97       */
98      public ReadTimeoutHandler(Timer timer, int timeoutSeconds) {
99          this(timer, timeoutSeconds, TimeUnit.SECONDS);
100     }
101 
102     /**
103      * Creates a new instance.
104      *
105      * @param timer
106      *        the {@link Timer} that is used to trigger the scheduled event.
107      *        The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
108      * @param timeout
109      *        read timeout
110      * @param unit
111      *        the {@link TimeUnit} of {@code timeout}
112      */
113     public ReadTimeoutHandler(Timer timer, long timeout, TimeUnit unit) {
114         if (timer == null) {
115             throw new NullPointerException("timer");
116         }
117         if (unit == null) {
118             throw new NullPointerException("unit");
119         }
120 
121         this.timer = timer;
122         if (timeout <= 0) {
123             timeoutMillis = 0;
124         } else {
125             timeoutMillis = Math.max(unit.toMillis(timeout), 1);
126         }
127     }
128 
129     /**
130      * Stops the {@link Timer} which was specified in the constructor of this
131      * handler.  You should not call this method if the {@link Timer} is in use
132      * by other objects.
133      */
134     public void releaseExternalResources() {
135         timer.stop();
136     }
137 
138     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
139         if (ctx.getPipeline().isAttached()) {
140             // channelOpen event has been fired already, which means
141             // this.channelOpen() will not be invoked.
142             // We have to initialize here instead.
143             initialize(ctx);
144         } else {
145             // channelOpen event has not been fired yet.
146             // this.channelOpen() will be invoked and initialization will occur there.
147         }
148     }
149 
150     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
151         // NOOP
152     }
153 
154     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
155         destroy(ctx);
156     }
157 
158     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
159         // NOOP
160     }
161 
162     @Override
163     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
164             throws Exception {
165         // This method will be invoked only if this handler was added
166         // before channelOpen event is fired.  If a user adds this handler
167         // after the channelOpen event, initialize() will be called by beforeAdd().
168         initialize(ctx);
169         ctx.sendUpstream(e);
170     }
171 
172     @Override
173     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
174             throws Exception {
175         destroy(ctx);
176         ctx.sendUpstream(e);
177     }
178 
179     @Override
180     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
181             throws Exception {
182         State state = (State) ctx.getAttachment();
183         state.lastReadTime = System.currentTimeMillis();
184         ctx.sendUpstream(e);
185     }
186 
187     private void initialize(ChannelHandlerContext ctx) {
188         State state = state(ctx);
189 
190         // Avoid the case where destroy() is called before scheduling timeouts.
191         // See: https://github.com/netty/netty/issues/143
192         synchronized (state) {
193             switch (state.state) {
194             case 1:
195             case 2:
196                 return;
197             }
198             state.state = 1;
199         }
200 
201         if (timeoutMillis > 0) {
202             state.timeout = timer.newTimeout(new ReadTimeoutTask(ctx), timeoutMillis, TimeUnit.MILLISECONDS);
203         }
204     }
205 
206     private static void destroy(ChannelHandlerContext ctx) {
207         State state = state(ctx);
208         synchronized (state) {
209             if (state.state != 1) {
210                 return;
211             }
212             state.state = 2;
213         }
214 
215         if (state.timeout != null) {
216             state.timeout.cancel();
217             state.timeout = null;
218         }
219     }
220 
221     private static State state(ChannelHandlerContext ctx) {
222         State state;
223         synchronized (ctx) {
224             // FIXME: It could have been better if there is setAttachmentIfAbsent().
225             state = (State) ctx.getAttachment();
226             if (state != null) {
227                 return state;
228             }
229             state = new State();
230             ctx.setAttachment(state);
231         }
232         return state;
233     }
234 
235     protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
236         fireExceptionCaught(ctx, EXCEPTION);
237     }
238 
239     private final class ReadTimeoutTask implements TimerTask {
240 
241         private final ChannelHandlerContext ctx;
242 
243         ReadTimeoutTask(ChannelHandlerContext ctx) {
244             this.ctx = ctx;
245         }
246 
247         public void run(Timeout timeout) throws Exception {
248             if (timeout.isCancelled()) {
249                 return;
250             }
251 
252             if (!ctx.getChannel().isOpen()) {
253                 return;
254             }
255 
256             State state = (State) ctx.getAttachment();
257             long currentTime = System.currentTimeMillis();
258             long nextDelay = timeoutMillis - (currentTime - state.lastReadTime);
259             if (nextDelay <= 0) {
260                 // Read timed out - set a new timeout and notify the callback.
261                 state.timeout =
262                     timer.newTimeout(this, timeoutMillis, TimeUnit.MILLISECONDS);
263                 fireReadTimedOut(ctx);
264             } else {
265                 // Read occurred before the timeout - set a new timeout with shorter delay.
266                 state.timeout =
267                     timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
268             }
269         }
270 
271         private void fireReadTimedOut(final ChannelHandlerContext ctx) throws Exception {
272             ctx.getPipeline().execute(new Runnable() {
273 
274                 public void run() {
275                     try {
276                         readTimedOut(ctx);
277                     } catch (Throwable t) {
278                         fireExceptionCaught(ctx, t);
279                     }
280                 }
281             });
282         }
283     }
284 
285     private static final class State {
286         // 0 - none, 1 - initialized, 2 - destroyed
287         int state;
288         volatile Timeout timeout;
289         volatile long lastReadTime = System.currentTimeMillis();
290 
291         State() {
292         }
293     }
294 }