1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19 import static io.netty.util.internal.ObjectUtil.checkNotNull;
20
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.ByteBufAllocator;
23 import io.netty.channel.ChannelHandlerContext;
24
25 import java.nio.ByteBuffer;
26 import java.nio.channels.ReadableByteChannel;
27
28
29
30
31
32
33 public class ChunkedNioStream implements ChunkedInput<ByteBuf> {
34
35 private final ReadableByteChannel in;
36
37 private final int chunkSize;
38 private long offset;
39
40
41
42
43 private final ByteBuffer byteBuffer;
44
45
46
47
48 public ChunkedNioStream(ReadableByteChannel in) {
49 this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
50 }
51
52
53
54
55
56
57
58 public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
59 this.in = checkNotNull(in, "in");
60 this.chunkSize = checkPositive(chunkSize, "chunkSize");
61 byteBuffer = ByteBuffer.allocate(chunkSize);
62 }
63
64
65
66
67 public long transferredBytes() {
68 return offset;
69 }
70
71 @Override
72 public boolean isEndOfInput() throws Exception {
73 if (byteBuffer.position() > 0) {
74
75 return false;
76 }
77 if (in.isOpen()) {
78
79 int b = in.read(byteBuffer);
80 if (b < 0) {
81 return true;
82 } else {
83 offset += b;
84 return false;
85 }
86 }
87 return true;
88 }
89
90 @Override
91 public void close() throws Exception {
92 in.close();
93 }
94
95 @Deprecated
96 @Override
97 public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
98 return readChunk(ctx.alloc());
99 }
100
101 @Override
102 public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
103 if (isEndOfInput()) {
104 return null;
105 }
106
107 int readBytes = byteBuffer.position();
108 for (;;) {
109 int localReadBytes = in.read(byteBuffer);
110 if (localReadBytes < 0) {
111 break;
112 }
113 readBytes += localReadBytes;
114 offset += localReadBytes;
115 if (readBytes == chunkSize) {
116 break;
117 }
118 }
119 byteBuffer.flip();
120 boolean release = true;
121 ByteBuf buffer = allocator.buffer(byteBuffer.remaining());
122 try {
123 buffer.writeBytes(byteBuffer);
124 byteBuffer.clear();
125 release = false;
126 return buffer;
127 } finally {
128 if (release) {
129 buffer.release();
130 }
131 }
132 }
133
134 @Override
135 public long length() {
136 return -1;
137 }
138
139 @Override
140 public long progress() {
141 return offset;
142 }
143 }