查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.common;
21  
22  import java.nio.ByteOrder;
23  
24  import org.apache.mina.common.support.BaseByteBuffer;
25  import org.apache.mina.util.ExpiringStack;
26  
27  /**
28   * A {@link ByteBufferAllocator} which pools allocated buffers. <p> All buffers are allocated with the size of power of
29   * 2 (e.g. 16, 32, 64, ...) This means that you cannot simply assume that the actual capacity of the buffer and the
30   * capacity you requested are same. </p> <p> This allocator releases the buffers which have not been in use for a
31   * certain period.  You can adjust the period by calling {@link #setTimeout(int)}. The default timeout is 1 minute (60
32   * seconds).  To release these buffers periodically, a daemon thread is started when a new instance of the allocator is
33   * created.  You can stop the thread by calling {@link #dispose()}. </p>
34   *
35   * @author The Apache Directory Project (mina-dev@directory.apache.org)
36   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (Fri, 13 Jul 2007) $
37   */
38  public class PooledByteBufferAllocator implements ByteBufferAllocator {
39      private static final int MINIMUM_CAPACITY = 1;
40  
41      private static int threadId = 0;
42  
43      private final Expirer expirer;
44  
45      private final ExpiringStack[] heapBufferStacks = new ExpiringStack[] {
46              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
47              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
48              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
49              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
50              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
51              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
52              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
53              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
54              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
55              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
56              new ExpiringStack(), new ExpiringStack(), };
57  
58      private final ExpiringStack[] directBufferStacks = new ExpiringStack[] {
59              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
60              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
61              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
62              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
63              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
64              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
65              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
66              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
67              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
68              new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
69              new ExpiringStack(), new ExpiringStack(), };
70  
71      private int timeout;
72  
73      private boolean disposed;
74  
75      /**
76       * Creates a new instance with the default timeout.
77       */
78      public PooledByteBufferAllocator() {
79          this(60);
80      }
81  
82      /**
83       * Creates a new instance with the specified <tt>timeout</tt>.
84       */
85      public PooledByteBufferAllocator(int timeout) {
86          setTimeout(timeout);
87          expirer = new Expirer();
88          expirer.start();
89      }
90  
91      /**
92       * Stops the thread which releases unused buffers and make this allocator unusable from now on.
93       */
94      public void dispose() {
95          if (this == ByteBuffer.getAllocator()) {
96              throw new IllegalStateException("This allocator is in use.");
97          }
98  
99          expirer.shutdown();
100 
101         for (int i = directBufferStacks.length - 1; i >= 0; i--) {
102             ExpiringStack stack = directBufferStacks[i];
103             synchronized (stack) {
104                 stack.clear();
105             }
106         }
107         for (int i = heapBufferStacks.length - 1; i >= 0; i--) {
108             ExpiringStack stack = heapBufferStacks[i];
109             synchronized (stack) {
110                 stack.clear();
111             }
112         }
113         disposed = true;
114     }
115 
116     /**
117      * Returns the timeout value of this allocator in seconds.
118      */
119     public int getTimeout() {
120         return timeout;
121     }
122 
123     /**
124      * Returns the timeout value of this allocator in milliseconds.
125      */
126     public long getTimeoutMillis() {
127         return timeout * 1000L;
128     }
129 
130     /**
131      * Sets the timeout value of this allocator in seconds.
132      *
133      * @param timeout <tt>0</tt> or negative value to disable timeout.
134      */
135     public void setTimeout(int timeout) {
136         if (timeout < 0) {
137             timeout = 0;
138         }
139 
140         this.timeout = timeout;
141 
142         if (timeout > 0) {
143 
144         }
145     }
146 
147     public ByteBuffer allocate(int capacity, boolean direct) {
148         ensureNotDisposed();
149         UnexpandableByteBuffer ubb = allocate0(capacity, direct);
150         PooledByteBuffer buf = allocateContainer();
151         buf.init(ubb, true);
152         return buf;
153     }
154 
155     private PooledByteBuffer allocateContainer() {
156         return new PooledByteBuffer();
157     }
158 
159     private UnexpandableByteBuffer allocate0(int capacity, boolean direct) {
160         ExpiringStack[] bufferStacks = direct ? directBufferStacks
161                 : heapBufferStacks;
162         int idx = getBufferStackIndex(bufferStacks, capacity);
163         ExpiringStack stack = bufferStacks[idx];
164 
165         UnexpandableByteBuffer buf;
166         synchronized (stack) {
167             buf = (UnexpandableByteBuffer) stack.pop();
168         }
169 
170         if (buf == null) {
171             java.nio.ByteBuffer nioBuf = direct ? java.nio.ByteBuffer
172                     .allocateDirect(MINIMUM_CAPACITY << idx)
173                     : java.nio.ByteBuffer.allocate(MINIMUM_CAPACITY << idx);
174             buf = new UnexpandableByteBuffer(nioBuf);
175         }
176 
177         buf.init();
178 
179         return buf;
180     }
181 
182     private void release0(UnexpandableByteBuffer buf) {
183         ExpiringStack[] bufferStacks = buf.buf().isDirect() ? directBufferStacks
184                 : heapBufferStacks;
185         ExpiringStack stack = bufferStacks[getBufferStackIndex(bufferStacks,
186                 buf.buf().capacity())];
187 
188         synchronized (stack) {
189             // push back
190             stack.push(buf);
191         }
192     }
193 
194     public ByteBuffer wrap(java.nio.ByteBuffer nioBuffer) {
195         ensureNotDisposed();
196         PooledByteBuffer buf = allocateContainer();
197         buf.init(new UnexpandableByteBuffer(nioBuffer), false);
198         buf.buf.init();
199         buf.setPooled(false);
200         return buf;
201     }
202 
203     private int getBufferStackIndex(ExpiringStack[] bufferStacks, int size) {
204         int targetSize = MINIMUM_CAPACITY;
205         int stackIdx = 0;
206         while (size > targetSize) {
207             targetSize <<= 1;
208             stackIdx++;
209             if (stackIdx >= bufferStacks.length) {
210                 throw new IllegalArgumentException("Buffer size is too big: "
211                         + size);
212             }
213         }
214 
215         return stackIdx;
216     }
217 
218     private void ensureNotDisposed() {
219         if (disposed) {
220             throw new IllegalStateException(
221                     "This allocator is disposed already.");
222         }
223     }
224 
225     private class Expirer extends Thread {
226         private boolean timeToStop;
227 
228         Expirer() {
229             super("PooledByteBufferExpirer-" + threadId++);
230             setDaemon(true);
231         }
232 
233         public void shutdown() {
234             timeToStop = true;
235             interrupt();
236             while (isAlive()) {
237                 try {
238                     join();
239                 } catch (InterruptedException e) {
240                     //ignore since this is shutdown time
241                 }
242             }
243         }
244 
245         public void run() {
246             // Expire unused buffers every seconds
247             while (!timeToStop) {
248                 try {
249                     Thread.sleep(1000);
250                 } catch (InterruptedException e) {
251                     //ignore
252                 }
253 
254                 // Check if expiration is disabled.
255                 long timeout = getTimeoutMillis();
256                 if (timeout <= 0L) {
257                     continue;
258                 }
259 
260                 // Expire old buffers
261                 long expirationTime = System.currentTimeMillis() - timeout;
262 
263                 for (int i = directBufferStacks.length - 1; i >= 0; i--) {
264                     ExpiringStack stack = directBufferStacks[i];
265                     synchronized (stack) {
266                         stack.expireBefore(expirationTime);
267                     }
268                 }
269 
270                 for (int i = heapBufferStacks.length - 1; i >= 0; i--) {
271                     ExpiringStack stack = heapBufferStacks[i];
272                     synchronized (stack) {
273                         stack.expireBefore(expirationTime);
274                     }
275                 }
276             }
277         }
278     }
279 
280     private class PooledByteBuffer extends BaseByteBuffer {
281         private UnexpandableByteBuffer buf;
282 
283         private int refCount = 1;
284 
285         protected PooledByteBuffer() {
286         }
287 
288         public synchronized void init(UnexpandableByteBuffer buf, boolean clear) {
289             this.buf = buf;
290             if (clear) {
291                 buf.buf().clear();
292             }
293             buf.buf().order(ByteOrder.BIG_ENDIAN);
294             setAutoExpand(false);
295             refCount = 1;
296         }
297 
298         public synchronized void acquire() {
299             if (refCount <= 0) {
300                 throw new IllegalStateException("Already released buffer.");
301             }
302 
303             refCount++;
304         }
305 
306         public void release() {
307             synchronized (this) {
308                 if (refCount <= 0) {
309                     refCount = 0;
310                     throw new IllegalStateException(
311                             "Already released buffer.  You released the buffer too many times.");
312                 }
313 
314                 refCount--;
315                 if (refCount > 0) {
316                     return;
317                 }
318             }
319 
320             // No need to return buffers to the pool if it is disposed already.
321             if (disposed) {
322                 return;
323             }
324 
325             buf.release();
326         }
327 
328         public java.nio.ByteBuffer buf() {
329             return buf.buf();
330         }
331 
332         public boolean isPooled() {
333             return buf.isPooled();
334         }
335 
336         public void setPooled(boolean pooled) {
337             buf.setPooled(pooled);
338         }
339 
340         public ByteBuffer duplicate() {
341             PooledByteBuffer newBuf = allocateContainer();
342             newBuf.init(new UnexpandableByteBuffer(buf().duplicate(), buf),
343                     false);
344             return newBuf;
345         }
346 
347         public ByteBuffer slice() {
348             PooledByteBuffer newBuf = allocateContainer();
349             newBuf.init(new UnexpandableByteBuffer(buf().slice(), buf), false);
350             return newBuf;
351         }
352 
353         public ByteBuffer asReadOnlyBuffer() {
354             PooledByteBuffer newBuf = allocateContainer();
355             newBuf.init(new UnexpandableByteBuffer(buf().asReadOnlyBuffer(),
356                     buf), false);
357             return newBuf;
358         }
359 
360         public byte[] array() {
361             return buf().array();
362         }
363 
364         public int arrayOffset() {
365             return buf().arrayOffset();
366         }
367 
368         protected void capacity0(int requestedCapacity) {
369             if (buf.isDerived()) {
370                 throw new IllegalStateException(
371                         "Derived buffers cannot be expanded.");
372             }
373 
374             int newCapacity = MINIMUM_CAPACITY;
375             while (newCapacity < requestedCapacity) {
376                 newCapacity <<= 1;
377             }
378 
379             UnexpandableByteBuffer oldBuf = this.buf;
380             boolean direct = isDirect();
381             UnexpandableByteBuffer newBuf;
382 
383             try {
384                 newBuf = allocate0(newCapacity, direct);
385             } catch (OutOfMemoryError e) {
386                 if (direct) {
387                     newBuf = allocate0(newCapacity, false);
388                 } else {
389                     throw e;
390                 }
391             }
392 
393             newBuf.buf().clear();
394             oldBuf.buf().clear();
395             newBuf.buf().put(oldBuf.buf());
396             this.buf = newBuf;
397             oldBuf.release();
398         }
399     }
400 
401     private class UnexpandableByteBuffer {
402         private final java.nio.ByteBuffer buf;
403 
404         private final UnexpandableByteBuffer parentBuf;
405 
406         private int refCount;
407 
408         private boolean pooled;
409 
410         protected UnexpandableByteBuffer(java.nio.ByteBuffer buf) {
411             this.buf = buf;
412             this.parentBuf = null;
413         }
414 
415         protected UnexpandableByteBuffer(java.nio.ByteBuffer buf,
416                 UnexpandableByteBuffer parentBuf) {
417             parentBuf.acquire();
418             this.buf = buf;
419             this.parentBuf = parentBuf;
420         }
421 
422         public void init() {
423             refCount = 1;
424             pooled = true;
425         }
426 
427         public synchronized void acquire() {
428             if (isDerived()) {
429                 parentBuf.acquire();
430                 return;
431             }
432 
433             if (refCount <= 0) {
434                 throw new IllegalStateException("Already released buffer.");
435             }
436 
437             refCount++;
438         }
439 
440         public void release() {
441             if (isDerived()) {
442                 parentBuf.release();
443                 return;
444             }
445 
446             synchronized (this) {
447                 if (refCount <= 0) {
448                     refCount = 0;
449                     throw new IllegalStateException(
450                             "Already released buffer.  You released the buffer too many times.");
451                 }
452 
453                 refCount--;
454                 if (refCount > 0) {
455                     return;
456                 }
457             }
458 
459             // No need to return buffers to the pool if it is disposed already.
460             if (disposed) {
461                 return;
462             }
463 
464             if (pooled) {
465                 if (parentBuf != null) {
466                     release0(parentBuf);
467                 } else {
468                     release0(this);
469                 }
470             }
471         }
472 
473         public java.nio.ByteBuffer buf() {
474             return buf;
475         }
476 
477         public boolean isPooled() {
478             return pooled;
479         }
480 
481         public void setPooled(boolean pooled) {
482             this.pooled = pooled;
483         }
484 
485         public boolean isDerived() {
486             return parentBuf != null;
487         }
488     }
489 }