查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.future;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27  import org.apache.mina.core.service.IoProcessor;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.util.ExceptionMonitor;
30  
31  /**
32   * A default implementation of {@link IoFuture} associated with
33   * an {@link IoSession}.
34   * 
35   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
36   */
37  public class DefaultIoFuture implements IoFuture {
38  
39      /** A number of milliseconds to wait between two deadlock controls ( 5 seconds ) */
40      private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
41  
42      /** The associated session */
43      private final IoSession session;
44  
45      /** A lock used by the wait() method */
46      private final Object lock;
47  
48      /** The first listener. This is easier to have this variable
49       * when we most of the time have one single listener */
50      private IoFutureListener<?> firstListener;
51  
52      /** All the other listeners, in case we have more than one */
53      private List<IoFutureListener<?>> otherListeners;
54  
55      private Object result;
56  
57      /** The flag used to determinate if the Future is completed or not */
58      private boolean ready;
59  
60      /** A counter for the number of threads waiting on this future */
61      private int waiters;
62  
63      /**
64       * Creates a new instance associated with an {@link IoSession}.
65       *
66       * @param session an {@link IoSession} which is associated with this future
67       */
68      public DefaultIoFuture(IoSession session) {
69          this.session = session;
70          this.lock = this;
71      }
72  
73      /**
74       * {@inheritDoc}
75       */
76      public IoSession getSession() {
77          return session;
78      }
79  
80      /**
81       * @deprecated Replaced with {@link #awaitUninterruptibly()}.
82       */
83      @Deprecated
84      public void join() {
85          awaitUninterruptibly();
86      }
87  
88      /**
89       * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
90       */
91      @Deprecated
92      public boolean join(long timeoutMillis) {
93          return awaitUninterruptibly(timeoutMillis);
94      }
95  
96      /**
97       * {@inheritDoc}
98       */
99      public IoFuture await() throws InterruptedException {
100         synchronized (lock) {
101             while (!ready) {
102                 waiters++;
103                 
104                 try {
105                     // Wait for a notify, or if no notify is called,
106                     // assume that we have a deadlock and exit the
107                     // loop to check for a potential deadlock.
108                     lock.wait(DEAD_LOCK_CHECK_INTERVAL);
109                 } finally {
110                     waiters--;
111                     
112                     if (!ready) {
113                         checkDeadLock();
114                     }
115                 }
116             }
117         }
118         
119         return this;
120     }
121 
122     /**
123      * {@inheritDoc}
124      */
125     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
126         return await0(unit.toMillis(timeout), true);
127     }
128 
129     /**
130      * {@inheritDoc}
131      */
132     public boolean await(long timeoutMillis) throws InterruptedException {
133         return await0(timeoutMillis, true);
134     }
135 
136     /**
137      * {@inheritDoc}
138      */
139     public IoFuture awaitUninterruptibly() {
140         try {
141             await0(Long.MAX_VALUE, false);
142         } catch (InterruptedException ie) {
143             // Do nothing : this catch is just mandatory by contract
144         }
145 
146         return this;
147     }
148 
149     /**
150      * {@inheritDoc}
151      */
152     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
153         try {
154             return await0(unit.toMillis(timeout), false);
155         } catch (InterruptedException e) {
156             throw new InternalError();
157         }
158     }
159 
160     /**
161      * {@inheritDoc}
162      */
163     public boolean awaitUninterruptibly(long timeoutMillis) {
164         try {
165             return await0(timeoutMillis, false);
166         } catch (InterruptedException e) {
167             throw new InternalError();
168         }
169     }
170 
171     /**
172      * Wait for the Future to be ready. If the requested delay is 0 or
173      * negative, this method immediately returns the value of the
174      * 'ready' flag.
175      * Every 5 second, the wait will be suspended to be able to check if
176      * there is a deadlock or not.
177      * 
178      * @param timeoutMillis The delay we will wait for the Future to be ready
179      * @param interruptable Tells if the wait can be interrupted or not
180      * @return <tt>true</tt> if the Future is ready
181      * @throws InterruptedException If the thread has been interrupted
182      * when it's not allowed.
183      */
184     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
185         long endTime = System.currentTimeMillis() + timeoutMillis;
186 
187         if (endTime < 0) {
188             endTime = Long.MAX_VALUE;
189         }
190 
191         synchronized (lock) {
192             // We can quit if the ready flag is set to true, or if
193             // the timeout is set to 0 or below : we don't wait in this case.
194             if (ready||(timeoutMillis <= 0)) {
195                 return ready;
196             }
197 
198             // The operation is not completed : we have to wait
199             waiters++;
200 
201             try {
202                 for (;;) {
203                     try {
204                         long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
205                         
206                         // Wait for the requested period of time,
207                         // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will
208                         // check that we aren't blocked.
209                         lock.wait(timeOut);
210                     } catch (InterruptedException e) {
211                         if (interruptable) {
212                             throw e;
213                         }
214                     }
215 
216                     if (ready || (endTime < System.currentTimeMillis())) {
217                         return ready;
218                     } else {
219                         // Take a chance, detect a potential deadlock
220                         checkDeadLock();
221                     }
222                 }
223             } finally {
224                 // We get here for 3 possible reasons :
225                 // 1) We have been notified (the operation has completed a way or another)
226                 // 2) We have reached the timeout
227                 // 3) The thread has been interrupted
228                 // In any case, we decrement the number of waiters, and we get out.
229                 waiters--;
230                 
231                 if (!ready) {
232                     checkDeadLock();
233                 }
234             }
235         }
236     }
237 
238     /**
239      * Check for a deadlock, ie look into the stack trace that we don't have already an 
240      * instance of the caller.
241      */
242     private void checkDeadLock() {
243         // Only read / write / connect / write future can cause dead lock.
244         if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
245             return;
246         }
247 
248         // Get the current thread stackTrace.
249         // Using Thread.currentThread().getStackTrace() is the best solution,
250         // even if slightly less efficient than doing a new Exception().getStackTrace(),
251         // as internally, it does exactly the same thing. The advantage of using
252         // this solution is that we may benefit some improvement with some
253         // future versions of Java.
254         StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
255 
256         // Simple and quick check.
257         for (StackTraceElement stackElement : stackTrace) {
258             if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) {
259                 IllegalStateException e = new IllegalStateException("t");
260                 e.getStackTrace();
261                 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
262                         + ".await() was invoked from an I/O processor thread.  " + "Please use "
263                         + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
264             }
265         }
266 
267         // And then more precisely.
268         for (StackTraceElement s : stackTrace) {
269             try {
270                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
271                 
272                 if (IoProcessor.class.isAssignableFrom(cls)) {
273                     throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
274                             + ".await() was invoked from an I/O processor thread.  " + "Please use "
275                             + IoFutureListener.class.getSimpleName()
276                             + " or configure a proper thread model alternatively.");
277                 }
278             } catch (ClassNotFoundException cnfe) {
279                 // Ignore
280             }
281         }
282     }
283 
284     /**
285      * {@inheritDoc}
286      */
287     public boolean isDone() {
288         synchronized (lock) {
289             return ready;
290         }
291     }
292 
293     /**
294      * Sets the result of the asynchronous operation, and mark it as finished.
295      * 
296      * @param newValue The result to store into the Future
297      * @return {@code true} if the value has been set, {@code false} if
298      * the future already has a value (thus is in ready state)
299      */
300     public boolean setValue(Object newValue) {
301         synchronized (lock) {
302             // Allowed only once.
303             if (ready) {
304                 return false;
305             }
306 
307             result = newValue;
308             ready = true;
309             
310             // Now, if we have waiters, notofy them that the operation has completed
311             if (waiters > 0) {
312                 lock.notifyAll();
313             }
314         }
315 
316         // Last, not least, inform the listeners
317         notifyListeners();
318         
319         return true;
320     }
321 
322     /**
323      * @return the result of the asynchronous operation.
324      */
325     protected Object getValue() {
326         synchronized (lock) {
327             return result;
328         }
329     }
330 
331     /**
332      * {@inheritDoc}
333      */
334     public IoFuture addListener(IoFutureListener<?> listener) {
335         if (listener == null) {
336             throw new IllegalArgumentException("listener");
337         }
338 
339         synchronized (lock) {
340             if (ready) {
341                 // Shortcut : if the operation has completed, no need to 
342                 // add a new listener, we just have to notify it. The existing
343                 // listeners have already been notified anyway, when the 
344                 // 'ready' flag has been set.
345                 notifyListener(listener);
346             } else {
347                 if (firstListener == null) {
348                     firstListener = listener;
349                 } else {
350                     if (otherListeners == null) {
351                         otherListeners = new ArrayList<IoFutureListener<?>>(1);
352                     }
353                     
354                     otherListeners.add(listener);
355                 }
356             }
357         }
358         
359         return this;
360     }
361 
362     /**
363      * {@inheritDoc}
364      */
365     public IoFuture removeListener(IoFutureListener<?> listener) {
366         if (listener == null) {
367             throw new IllegalArgumentException("listener");
368         }
369 
370         synchronized (lock) {
371             if (!ready) {
372                 if (listener == firstListener) {
373                     if ((otherListeners != null) && !otherListeners.isEmpty()) {
374                         firstListener = otherListeners.remove(0);
375                     } else {
376                         firstListener = null;
377                     }
378                 } else if (otherListeners != null) {
379                     otherListeners.remove(listener);
380                 }
381             }
382         }
383 
384         return this;
385     }
386 
387     /**
388      * Notify the listeners, if we have some.
389      */
390     private void notifyListeners() {
391         // There won't be any visibility problem or concurrent modification
392         // because 'ready' flag will be checked against both addListener and
393         // removeListener calls.
394         if (firstListener != null) {
395             notifyListener(firstListener);
396             firstListener = null;
397 
398             if (otherListeners != null) {
399                 for (IoFutureListener<?> listener : otherListeners) {
400                     notifyListener(listener);
401                 }
402                 
403                 otherListeners = null;
404             }
405         }
406     }
407 
408     @SuppressWarnings("unchecked")
409     private void notifyListener(IoFutureListener listener) {
410         try {
411             listener.operationComplete(this);
412         } catch (Exception e) {
413             ExceptionMonitor.getInstance().exceptionCaught(e);
414         }
415     }
416 }