1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.common;
21
22 import java.nio.ByteOrder;
23
24 import org.apache.mina.common.support.BaseByteBuffer;
25 import org.apache.mina.util.ExpiringStack;
26
27
28
29
30
31
32
33
34
35
36
37
38 public class PooledByteBufferAllocator implements ByteBufferAllocator {
39 private static final int MINIMUM_CAPACITY = 1;
40
41 private static int threadId = 0;
42
43 private final Expirer expirer;
44
45 private final ExpiringStack[] heapBufferStacks = new ExpiringStack[] {
46 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
47 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
48 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
49 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
50 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
51 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
52 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
53 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
54 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
55 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
56 new ExpiringStack(), new ExpiringStack(), };
57
58 private final ExpiringStack[] directBufferStacks = new ExpiringStack[] {
59 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
60 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
61 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
62 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
63 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
64 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
65 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
66 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
67 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
68 new ExpiringStack(), new ExpiringStack(), new ExpiringStack(),
69 new ExpiringStack(), new ExpiringStack(), };
70
71 private int timeout;
72
73 private boolean disposed;
74
75
76
77
78 public PooledByteBufferAllocator() {
79 this(60);
80 }
81
82
83
84
85 public PooledByteBufferAllocator(int timeout) {
86 setTimeout(timeout);
87 expirer = new Expirer();
88 expirer.start();
89 }
90
91
92
93
94 public void dispose() {
95 if (this == ByteBuffer.getAllocator()) {
96 throw new IllegalStateException("This allocator is in use.");
97 }
98
99 expirer.shutdown();
100
101 for (int i = directBufferStacks.length - 1; i >= 0; i--) {
102 ExpiringStack stack = directBufferStacks[i];
103 synchronized (stack) {
104 stack.clear();
105 }
106 }
107 for (int i = heapBufferStacks.length - 1; i >= 0; i--) {
108 ExpiringStack stack = heapBufferStacks[i];
109 synchronized (stack) {
110 stack.clear();
111 }
112 }
113 disposed = true;
114 }
115
116
117
118
119 public int getTimeout() {
120 return timeout;
121 }
122
123
124
125
126 public long getTimeoutMillis() {
127 return timeout * 1000L;
128 }
129
130
131
132
133
134
135 public void setTimeout(int timeout) {
136 if (timeout < 0) {
137 timeout = 0;
138 }
139
140 this.timeout = timeout;
141
142 if (timeout > 0) {
143
144 }
145 }
146
147 public ByteBuffer allocate(int capacity, boolean direct) {
148 ensureNotDisposed();
149 UnexpandableByteBuffer ubb = allocate0(capacity, direct);
150 PooledByteBuffer buf = allocateContainer();
151 buf.init(ubb, true);
152 return buf;
153 }
154
155 private PooledByteBuffer allocateContainer() {
156 return new PooledByteBuffer();
157 }
158
159 private UnexpandableByteBuffer allocate0(int capacity, boolean direct) {
160 ExpiringStack[] bufferStacks = direct ? directBufferStacks
161 : heapBufferStacks;
162 int idx = getBufferStackIndex(bufferStacks, capacity);
163 ExpiringStack stack = bufferStacks[idx];
164
165 UnexpandableByteBuffer buf;
166 synchronized (stack) {
167 buf = (UnexpandableByteBuffer) stack.pop();
168 }
169
170 if (buf == null) {
171 java.nio.ByteBuffer nioBuf = direct ? java.nio.ByteBuffer
172 .allocateDirect(MINIMUM_CAPACITY << idx)
173 : java.nio.ByteBuffer.allocate(MINIMUM_CAPACITY << idx);
174 buf = new UnexpandableByteBuffer(nioBuf);
175 }
176
177 buf.init();
178
179 return buf;
180 }
181
182 private void release0(UnexpandableByteBuffer buf) {
183 ExpiringStack[] bufferStacks = buf.buf().isDirect() ? directBufferStacks
184 : heapBufferStacks;
185 ExpiringStack stack = bufferStacks[getBufferStackIndex(bufferStacks,
186 buf.buf().capacity())];
187
188 synchronized (stack) {
189
190 stack.push(buf);
191 }
192 }
193
194 public ByteBuffer wrap(java.nio.ByteBuffer nioBuffer) {
195 ensureNotDisposed();
196 PooledByteBuffer buf = allocateContainer();
197 buf.init(new UnexpandableByteBuffer(nioBuffer), false);
198 buf.buf.init();
199 buf.setPooled(false);
200 return buf;
201 }
202
203 private int getBufferStackIndex(ExpiringStack[] bufferStacks, int size) {
204 int targetSize = MINIMUM_CAPACITY;
205 int stackIdx = 0;
206 while (size > targetSize) {
207 targetSize <<= 1;
208 stackIdx++;
209 if (stackIdx >= bufferStacks.length) {
210 throw new IllegalArgumentException("Buffer size is too big: "
211 + size);
212 }
213 }
214
215 return stackIdx;
216 }
217
218 private void ensureNotDisposed() {
219 if (disposed) {
220 throw new IllegalStateException(
221 "This allocator is disposed already.");
222 }
223 }
224
225 private class Expirer extends Thread {
226 private boolean timeToStop;
227
228 Expirer() {
229 super("PooledByteBufferExpirer-" + threadId++);
230 setDaemon(true);
231 }
232
233 public void shutdown() {
234 timeToStop = true;
235 interrupt();
236 while (isAlive()) {
237 try {
238 join();
239 } catch (InterruptedException e) {
240
241 }
242 }
243 }
244
245 public void run() {
246
247 while (!timeToStop) {
248 try {
249 Thread.sleep(1000);
250 } catch (InterruptedException e) {
251
252 }
253
254
255 long timeout = getTimeoutMillis();
256 if (timeout <= 0L) {
257 continue;
258 }
259
260
261 long expirationTime = System.currentTimeMillis() - timeout;
262
263 for (int i = directBufferStacks.length - 1; i >= 0; i--) {
264 ExpiringStack stack = directBufferStacks[i];
265 synchronized (stack) {
266 stack.expireBefore(expirationTime);
267 }
268 }
269
270 for (int i = heapBufferStacks.length - 1; i >= 0; i--) {
271 ExpiringStack stack = heapBufferStacks[i];
272 synchronized (stack) {
273 stack.expireBefore(expirationTime);
274 }
275 }
276 }
277 }
278 }
279
280 private class PooledByteBuffer extends BaseByteBuffer {
281 private UnexpandableByteBuffer buf;
282
283 private int refCount = 1;
284
285 protected PooledByteBuffer() {
286 }
287
288 public synchronized void init(UnexpandableByteBuffer buf, boolean clear) {
289 this.buf = buf;
290 if (clear) {
291 buf.buf().clear();
292 }
293 buf.buf().order(ByteOrder.BIG_ENDIAN);
294 setAutoExpand(false);
295 refCount = 1;
296 }
297
298 public synchronized void acquire() {
299 if (refCount <= 0) {
300 throw new IllegalStateException("Already released buffer.");
301 }
302
303 refCount++;
304 }
305
306 public void release() {
307 synchronized (this) {
308 if (refCount <= 0) {
309 refCount = 0;
310 throw new IllegalStateException(
311 "Already released buffer. You released the buffer too many times.");
312 }
313
314 refCount--;
315 if (refCount > 0) {
316 return;
317 }
318 }
319
320
321 if (disposed) {
322 return;
323 }
324
325 buf.release();
326 }
327
328 public java.nio.ByteBuffer buf() {
329 return buf.buf();
330 }
331
332 public boolean isPooled() {
333 return buf.isPooled();
334 }
335
336 public void setPooled(boolean pooled) {
337 buf.setPooled(pooled);
338 }
339
340 public ByteBuffer duplicate() {
341 PooledByteBuffer newBuf = allocateContainer();
342 newBuf.init(new UnexpandableByteBuffer(buf().duplicate(), buf),
343 false);
344 return newBuf;
345 }
346
347 public ByteBuffer slice() {
348 PooledByteBuffer newBuf = allocateContainer();
349 newBuf.init(new UnexpandableByteBuffer(buf().slice(), buf), false);
350 return newBuf;
351 }
352
353 public ByteBuffer asReadOnlyBuffer() {
354 PooledByteBuffer newBuf = allocateContainer();
355 newBuf.init(new UnexpandableByteBuffer(buf().asReadOnlyBuffer(),
356 buf), false);
357 return newBuf;
358 }
359
360 public byte[] array() {
361 return buf().array();
362 }
363
364 public int arrayOffset() {
365 return buf().arrayOffset();
366 }
367
368 protected void capacity0(int requestedCapacity) {
369 if (buf.isDerived()) {
370 throw new IllegalStateException(
371 "Derived buffers cannot be expanded.");
372 }
373
374 int newCapacity = MINIMUM_CAPACITY;
375 while (newCapacity < requestedCapacity) {
376 newCapacity <<= 1;
377 }
378
379 UnexpandableByteBuffer oldBuf = this.buf;
380 boolean direct = isDirect();
381 UnexpandableByteBuffer newBuf;
382
383 try {
384 newBuf = allocate0(newCapacity, direct);
385 } catch (OutOfMemoryError e) {
386 if (direct) {
387 newBuf = allocate0(newCapacity, false);
388 } else {
389 throw e;
390 }
391 }
392
393 newBuf.buf().clear();
394 oldBuf.buf().clear();
395 newBuf.buf().put(oldBuf.buf());
396 this.buf = newBuf;
397 oldBuf.release();
398 }
399 }
400
401 private class UnexpandableByteBuffer {
402 private final java.nio.ByteBuffer buf;
403
404 private final UnexpandableByteBuffer parentBuf;
405
406 private int refCount;
407
408 private boolean pooled;
409
410 protected UnexpandableByteBuffer(java.nio.ByteBuffer buf) {
411 this.buf = buf;
412 this.parentBuf = null;
413 }
414
415 protected UnexpandableByteBuffer(java.nio.ByteBuffer buf,
416 UnexpandableByteBuffer parentBuf) {
417 parentBuf.acquire();
418 this.buf = buf;
419 this.parentBuf = parentBuf;
420 }
421
422 public void init() {
423 refCount = 1;
424 pooled = true;
425 }
426
427 public synchronized void acquire() {
428 if (isDerived()) {
429 parentBuf.acquire();
430 return;
431 }
432
433 if (refCount <= 0) {
434 throw new IllegalStateException("Already released buffer.");
435 }
436
437 refCount++;
438 }
439
440 public void release() {
441 if (isDerived()) {
442 parentBuf.release();
443 return;
444 }
445
446 synchronized (this) {
447 if (refCount <= 0) {
448 refCount = 0;
449 throw new IllegalStateException(
450 "Already released buffer. You released the buffer too many times.");
451 }
452
453 refCount--;
454 if (refCount > 0) {
455 return;
456 }
457 }
458
459
460 if (disposed) {
461 return;
462 }
463
464 if (pooled) {
465 if (parentBuf != null) {
466 release0(parentBuf);
467 } else {
468 release0(this);
469 }
470 }
471 }
472
473 public java.nio.ByteBuffer buf() {
474 return buf;
475 }
476
477 public boolean isPooled() {
478 return pooled;
479 }
480
481 public void setPooled(boolean pooled) {
482 this.pooled = pooled;
483 }
484
485 public boolean isDerived() {
486 return parentBuf != null;
487 }
488 }
489 }