1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.core.future;
21
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.concurrent.TimeUnit;
25
26 import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27 import org.apache.mina.core.service.IoProcessor;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.util.ExceptionMonitor;
30
31 /**
32 * A default implementation of {@link IoFuture} associated with
33 * an {@link IoSession}.
34 *
35 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
36 */
37 public class DefaultIoFuture implements IoFuture {
38
39 /** A number of milliseconds to wait between two deadlock controls ( 5 seconds ) */
40 private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
41
42 /** The associated session */
43 private final IoSession session;
44
45 /** A lock used by the wait() method */
46 private final Object lock;
47
48 /** The first listener. This is easier to have this variable
49 * when we most of the time have one single listener */
50 private IoFutureListener<?> firstListener;
51
52 /** All the other listeners, in case we have more than one */
53 private List<IoFutureListener<?>> otherListeners;
54
55 private Object result;
56
57 /** The flag used to determinate if the Future is completed or not */
58 private boolean ready;
59
60 /** A counter for the number of threads waiting on this future */
61 private int waiters;
62
63 /**
64 * Creates a new instance associated with an {@link IoSession}.
65 *
66 * @param session an {@link IoSession} which is associated with this future
67 */
68 public DefaultIoFuture(IoSession session) {
69 this.session = session;
70 this.lock = this;
71 }
72
73 /**
74 * {@inheritDoc}
75 */
76 public IoSession getSession() {
77 return session;
78 }
79
80 /**
81 * @deprecated Replaced with {@link #awaitUninterruptibly()}.
82 */
83 @Deprecated
84 public void join() {
85 awaitUninterruptibly();
86 }
87
88 /**
89 * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
90 */
91 @Deprecated
92 public boolean join(long timeoutMillis) {
93 return awaitUninterruptibly(timeoutMillis);
94 }
95
96 /**
97 * {@inheritDoc}
98 */
99 public IoFuture await() throws InterruptedException {
100 synchronized (lock) {
101 while (!ready) {
102 waiters++;
103
104 try {
105 // Wait for a notify, or if no notify is called,
106 // assume that we have a deadlock and exit the
107 // loop to check for a potential deadlock.
108 lock.wait(DEAD_LOCK_CHECK_INTERVAL);
109 } finally {
110 waiters--;
111
112 if (!ready) {
113 checkDeadLock();
114 }
115 }
116 }
117 }
118
119 return this;
120 }
121
122 /**
123 * {@inheritDoc}
124 */
125 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
126 return await0(unit.toMillis(timeout), true);
127 }
128
129 /**
130 * {@inheritDoc}
131 */
132 public boolean await(long timeoutMillis) throws InterruptedException {
133 return await0(timeoutMillis, true);
134 }
135
136 /**
137 * {@inheritDoc}
138 */
139 public IoFuture awaitUninterruptibly() {
140 try {
141 await0(Long.MAX_VALUE, false);
142 } catch (InterruptedException ie) {
143 // Do nothing : this catch is just mandatory by contract
144 }
145
146 return this;
147 }
148
149 /**
150 * {@inheritDoc}
151 */
152 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
153 try {
154 return await0(unit.toMillis(timeout), false);
155 } catch (InterruptedException e) {
156 throw new InternalError();
157 }
158 }
159
160 /**
161 * {@inheritDoc}
162 */
163 public boolean awaitUninterruptibly(long timeoutMillis) {
164 try {
165 return await0(timeoutMillis, false);
166 } catch (InterruptedException e) {
167 throw new InternalError();
168 }
169 }
170
171 /**
172 * Wait for the Future to be ready. If the requested delay is 0 or
173 * negative, this method immediately returns the value of the
174 * 'ready' flag.
175 * Every 5 second, the wait will be suspended to be able to check if
176 * there is a deadlock or not.
177 *
178 * @param timeoutMillis The delay we will wait for the Future to be ready
179 * @param interruptable Tells if the wait can be interrupted or not
180 * @return <tt>true</tt> if the Future is ready
181 * @throws InterruptedException If the thread has been interrupted
182 * when it's not allowed.
183 */
184 private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
185 long endTime = System.currentTimeMillis() + timeoutMillis;
186
187 if (endTime < 0) {
188 endTime = Long.MAX_VALUE;
189 }
190
191 synchronized (lock) {
192 // We can quit if the ready flag is set to true, or if
193 // the timeout is set to 0 or below : we don't wait in this case.
194 if (ready||(timeoutMillis <= 0)) {
195 return ready;
196 }
197
198 // The operation is not completed : we have to wait
199 waiters++;
200
201 try {
202 for (;;) {
203 try {
204 long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
205
206 // Wait for the requested period of time,
207 // but every DEAD_LOCK_CHECK_INTERVAL seconds, we will
208 // check that we aren't blocked.
209 lock.wait(timeOut);
210 } catch (InterruptedException e) {
211 if (interruptable) {
212 throw e;
213 }
214 }
215
216 if (ready || (endTime < System.currentTimeMillis())) {
217 return ready;
218 } else {
219 // Take a chance, detect a potential deadlock
220 checkDeadLock();
221 }
222 }
223 } finally {
224 // We get here for 3 possible reasons :
225 // 1) We have been notified (the operation has completed a way or another)
226 // 2) We have reached the timeout
227 // 3) The thread has been interrupted
228 // In any case, we decrement the number of waiters, and we get out.
229 waiters--;
230
231 if (!ready) {
232 checkDeadLock();
233 }
234 }
235 }
236 }
237
238 /**
239 * Check for a deadlock, ie look into the stack trace that we don't have already an
240 * instance of the caller.
241 */
242 private void checkDeadLock() {
243 // Only read / write / connect / write future can cause dead lock.
244 if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
245 return;
246 }
247
248 // Get the current thread stackTrace.
249 // Using Thread.currentThread().getStackTrace() is the best solution,
250 // even if slightly less efficient than doing a new Exception().getStackTrace(),
251 // as internally, it does exactly the same thing. The advantage of using
252 // this solution is that we may benefit some improvement with some
253 // future versions of Java.
254 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
255
256 // Simple and quick check.
257 for (StackTraceElement stackElement : stackTrace) {
258 if (AbstractPollingIoProcessor.class.getName().equals(stackElement.getClassName())) {
259 IllegalStateException e = new IllegalStateException("t");
260 e.getStackTrace();
261 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
262 + ".await() was invoked from an I/O processor thread. " + "Please use "
263 + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
264 }
265 }
266
267 // And then more precisely.
268 for (StackTraceElement s : stackTrace) {
269 try {
270 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
271
272 if (IoProcessor.class.isAssignableFrom(cls)) {
273 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
274 + ".await() was invoked from an I/O processor thread. " + "Please use "
275 + IoFutureListener.class.getSimpleName()
276 + " or configure a proper thread model alternatively.");
277 }
278 } catch (ClassNotFoundException cnfe) {
279 // Ignore
280 }
281 }
282 }
283
284 /**
285 * {@inheritDoc}
286 */
287 public boolean isDone() {
288 synchronized (lock) {
289 return ready;
290 }
291 }
292
293 /**
294 * Sets the result of the asynchronous operation, and mark it as finished.
295 *
296 * @param newValue The result to store into the Future
297 * @return {@code true} if the value has been set, {@code false} if
298 * the future already has a value (thus is in ready state)
299 */
300 public boolean setValue(Object newValue) {
301 synchronized (lock) {
302 // Allowed only once.
303 if (ready) {
304 return false;
305 }
306
307 result = newValue;
308 ready = true;
309
310 // Now, if we have waiters, notofy them that the operation has completed
311 if (waiters > 0) {
312 lock.notifyAll();
313 }
314 }
315
316 // Last, not least, inform the listeners
317 notifyListeners();
318
319 return true;
320 }
321
322 /**
323 * @return the result of the asynchronous operation.
324 */
325 protected Object getValue() {
326 synchronized (lock) {
327 return result;
328 }
329 }
330
331 /**
332 * {@inheritDoc}
333 */
334 public IoFuture addListener(IoFutureListener<?> listener) {
335 if (listener == null) {
336 throw new IllegalArgumentException("listener");
337 }
338
339 synchronized (lock) {
340 if (ready) {
341 // Shortcut : if the operation has completed, no need to
342 // add a new listener, we just have to notify it. The existing
343 // listeners have already been notified anyway, when the
344 // 'ready' flag has been set.
345 notifyListener(listener);
346 } else {
347 if (firstListener == null) {
348 firstListener = listener;
349 } else {
350 if (otherListeners == null) {
351 otherListeners = new ArrayList<IoFutureListener<?>>(1);
352 }
353
354 otherListeners.add(listener);
355 }
356 }
357 }
358
359 return this;
360 }
361
362 /**
363 * {@inheritDoc}
364 */
365 public IoFuture removeListener(IoFutureListener<?> listener) {
366 if (listener == null) {
367 throw new IllegalArgumentException("listener");
368 }
369
370 synchronized (lock) {
371 if (!ready) {
372 if (listener == firstListener) {
373 if ((otherListeners != null) && !otherListeners.isEmpty()) {
374 firstListener = otherListeners.remove(0);
375 } else {
376 firstListener = null;
377 }
378 } else if (otherListeners != null) {
379 otherListeners.remove(listener);
380 }
381 }
382 }
383
384 return this;
385 }
386
387 /**
388 * Notify the listeners, if we have some.
389 */
390 private void notifyListeners() {
391 // There won't be any visibility problem or concurrent modification
392 // because 'ready' flag will be checked against both addListener and
393 // removeListener calls.
394 if (firstListener != null) {
395 notifyListener(firstListener);
396 firstListener = null;
397
398 if (otherListeners != null) {
399 for (IoFutureListener<?> listener : otherListeners) {
400 notifyListener(listener);
401 }
402
403 otherListeners = null;
404 }
405 }
406 }
407
408 @SuppressWarnings("unchecked")
409 private void notifyListener(IoFutureListener listener) {
410 try {
411 listener.operationComplete(this);
412 } catch (Exception e) {
413 ExceptionMonitor.getInstance().exceptionCaught(e);
414 }
415 }
416 }