查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.common.support;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  
25  import org.apache.mina.common.ExceptionMonitor;
26  import org.apache.mina.common.IoFuture;
27  import org.apache.mina.common.IoFutureListener;
28  import org.apache.mina.common.IoSession;
29  
30  /**
31   * A default implementation of {@link IoFuture}.
32   *  
33   * @author The Apache MINA Project (dev@mina.apache.org)
34   * @version $Rev: 599822 $, $Date: 2007-11-30 22:54:07 +0900 (Fri, 30 Nov 2007) $
35   */
36  public class DefaultIoFuture implements IoFuture {
37  
38      private final IoSession session;
39      private final Object lock;
40      private IoFutureListener firstListener;
41      private List<IoFutureListener> otherListeners;
42      private Object result;
43      private boolean ready;
44      private int waiters;
45  
46      /**
47       * Creates a new instance.
48       *
49       * @param session an {@link IoSession} which is associated with this future
50       */
51      public DefaultIoFuture(IoSession session) {
52          this.session = session;
53          this.lock = this;
54      }
55  
56      /**
57       * Creates a new instance.
58       *
59       * @param session an {@link IoSession} which is associated with this future
60       */
61      public DefaultIoFuture(IoSession session, Object lock) {
62          this.session = session;
63          this.lock = lock;
64      }
65  
66      public IoSession getSession() {
67          return session;
68      }
69  
70      public Object getLock() {
71          return lock;
72      }
73  
74      public void join() {
75          awaitUninterruptibly();
76      }
77  
78      public boolean join(long timeoutMillis) {
79          return awaitUninterruptibly(timeoutMillis);
80      }
81  
82      private IoFuture awaitUninterruptibly() {
83          synchronized (lock) {
84              while (!ready) {
85                  waiters++;
86                  try {
87                      lock.wait();
88                  } catch (InterruptedException e) {
89                  } finally {
90                      waiters--;
91                  }
92              }
93          }
94  
95          return this;
96      }
97  
98      private boolean awaitUninterruptibly(long timeoutMillis) {
99          try {
100             return await0(timeoutMillis, false);
101         } catch (InterruptedException e) {
102             throw new InternalError();
103         }
104     }
105 
106     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
107         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
108         long waitTime = timeoutMillis;
109 
110         synchronized (lock) {
111             if (ready) {
112                 return ready;
113             } else if (waitTime <= 0) {
114                 return ready;
115             }
116 
117             waiters++;
118             try {
119                 for (;;) {
120                     try {
121                         lock.wait(waitTime);
122                     } catch (InterruptedException e) {
123                         if (interruptable) {
124                             throw e;
125                         }
126                     }
127 
128                     if (ready) {
129                         return true;
130                     } else {
131                         waitTime = timeoutMillis
132                                 - (System.currentTimeMillis() - startTime);
133                         if (waitTime <= 0) {
134                             return ready;
135                         }
136                     }
137                 }
138             } finally {
139                 waiters--;
140             }
141         }
142     }
143 
144     public boolean isReady() {
145         synchronized (lock) {
146             return ready;
147         }
148     }
149 
150     /**
151      * Sets the result of the asynchronous operation, and mark it as finished.
152      */
153     protected void setValue(Object newValue) {
154         synchronized (lock) {
155             // Allow only once.
156             if (ready) {
157                 return;
158             }
159 
160             result = newValue;
161             ready = true;
162             if (waiters > 0) {
163                 lock.notifyAll();
164             }
165         }
166 
167         notifyListeners();
168     }
169 
170     /**
171      * Returns the result of the asynchronous operation.
172      */
173     protected Object getValue() {
174         synchronized (lock) {
175             return result;
176         }
177     }
178 
179     public void addListener(IoFutureListener listener) {
180         if (listener == null) {
181             throw new NullPointerException("listener");
182         }
183 
184         boolean notifyNow = false;
185         synchronized (lock) {
186             if (ready) {
187                 notifyNow = true;
188             } else {
189                 if (firstListener == null) {
190                     firstListener = listener;
191                 } else {
192                     if (otherListeners == null) {
193                         otherListeners = new ArrayList<IoFutureListener>(1);
194                     }
195                     otherListeners.add(listener);
196                 }
197             }
198         }
199 
200         if (notifyNow) {
201             notifyListener(listener);
202         }
203     }
204 
205     public void removeListener(IoFutureListener listener) {
206         if (listener == null) {
207             throw new NullPointerException("listener");
208         }
209 
210         synchronized (lock) {
211             if (!ready) {
212                 if (listener == firstListener) {
213                     if (otherListeners != null && !otherListeners.isEmpty()) {
214                         firstListener = otherListeners.remove(0);
215                     } else {
216                         firstListener = null;
217                     }
218                 } else if (otherListeners != null) {
219                     otherListeners.remove(listener);
220                 }
221             }
222         }
223     }
224 
225     private void notifyListeners() {
226         // There won't be any visibility problem or concurrent modification
227         // because 'ready' flag will be checked against both addListener and
228         // removeListener calls.
229         if (firstListener != null) {
230             notifyListener(firstListener);
231             firstListener = null;
232 
233             if (otherListeners != null) {
234                 for (IoFutureListener l : otherListeners) {
235                     notifyListener(l);
236                 }
237                 otherListeners = null;
238             }
239         }
240     }
241 
242     @SuppressWarnings("unchecked")
243     private void notifyListener(IoFutureListener l) {
244         try {
245             l.operationComplete(this);
246         } catch (Throwable t) {
247             ExceptionMonitor.getInstance().exceptionCaught(t);
248         }
249     }
250 }