1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.buffer.ChannelBuffers.*;
19
20 import java.nio.ByteBuffer;
21 import java.nio.channels.ReadableByteChannel;
22
23 import org.jboss.netty.buffer.ChannelBuffer;
24
25 /**
26 * A {@link ChunkedInput} that fetches data from a {@link ReadableByteChannel}
27 * chunk by chunk. Please note that the {@link ReadableByteChannel} must
28 * operate in blocking mode. Non-blocking mode channels are not supported.
29 */
30 public class ChunkedNioStream implements ChunkedInput {
31
32 private final ReadableByteChannel in;
33
34 private final int chunkSize;
35 private long offset;
36
37 /**
38 * Associated ByteBuffer
39 */
40 private final ByteBuffer byteBuffer;
41
42 /**
43 * Creates a new instance that fetches data from the specified channel.
44 */
45 public ChunkedNioStream(ReadableByteChannel in) {
46 this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
47 }
48
49 /**
50 * Creates a new instance that fetches data from the specified channel.
51 *
52 * @param chunkSize the number of bytes to fetch on each
53 * {@link #nextChunk()} call
54 */
55 public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
56 if (in == null) {
57 throw new NullPointerException("in");
58 }
59 if (chunkSize <= 0) {
60 throw new IllegalArgumentException("chunkSize: " + chunkSize +
61 " (expected: a positive integer)");
62 }
63 this.in = in;
64 offset = 0;
65 this.chunkSize = chunkSize;
66 byteBuffer = ByteBuffer.allocate(chunkSize);
67 }
68
69 /**
70 * Returns the number of transferred bytes.
71 */
72 public long getTransferredBytes() {
73 return offset;
74 }
75
76 public boolean hasNextChunk() throws Exception {
77 if (byteBuffer.position() > 0) {
78 // A previous read was not over, so there is a next chunk in the buffer at least
79 return true;
80 }
81 if (in.isOpen()) {
82 // Try to read a new part, and keep this part (no rewind)
83 int b = in.read(byteBuffer);
84 if (b < 0) {
85 return false;
86 } else {
87 offset += b;
88 return true;
89 }
90 }
91 return false;
92 }
93
94 public boolean isEndOfInput() throws Exception {
95 return !hasNextChunk();
96 }
97
98 public void close() throws Exception {
99 in.close();
100 }
101
102 public Object nextChunk() throws Exception {
103 if (!hasNextChunk()) {
104 return null;
105 }
106 // buffer cannot be not be empty from there
107 int readBytes = byteBuffer.position();
108 for (;;) {
109 int localReadBytes = in.read(byteBuffer);
110 if (localReadBytes < 0) {
111 break;
112 }
113 readBytes += localReadBytes;
114 offset += localReadBytes;
115
116 if (readBytes == chunkSize) {
117 break;
118 }
119 }
120 byteBuffer.flip();
121 // copy since buffer is keeped for next usage
122 ChannelBuffer buffer = copiedBuffer(byteBuffer);
123 byteBuffer.clear();
124 return buffer;
125 }
126 }