1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.util.internal.ObjectUtil;
22
23 import java.io.InputStream;
24 import java.io.PushbackInputStream;
25
26
27
28
29
30
31
32
33
34
35
36 public class ChunkedStream implements ChunkedInput<ByteBuf> {
37
38 static final int DEFAULT_CHUNK_SIZE = 8192;
39
40 private final PushbackInputStream in;
41 private final int chunkSize;
42 private long offset;
43 private boolean closed;
44
45
46
47
48 public ChunkedStream(InputStream in) {
49 this(in, DEFAULT_CHUNK_SIZE);
50 }
51
52
53
54
55
56
57
58 public ChunkedStream(InputStream in, int chunkSize) {
59 ObjectUtil.checkNotNull(in, "in");
60 ObjectUtil.checkPositive(chunkSize, "chunkSize");
61
62 if (in instanceof PushbackInputStream) {
63 this.in = (PushbackInputStream) in;
64 } else {
65 this.in = new PushbackInputStream(in);
66 }
67 this.chunkSize = chunkSize;
68 }
69
70
71
72
73 public long transferredBytes() {
74 return offset;
75 }
76
77 @Override
78 public boolean isEndOfInput() throws Exception {
79 if (closed) {
80 return true;
81 }
82 if (in.available() > 0) {
83 return false;
84 }
85
86 int b = in.read();
87 if (b < 0) {
88 return true;
89 } else {
90 in.unread(b);
91 return false;
92 }
93 }
94
95 @Override
96 public void close() throws Exception {
97 closed = true;
98 in.close();
99 }
100
101 @Deprecated
102 @Override
103 public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
104 return readChunk(ctx.alloc());
105 }
106
107 @Override
108 public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
109 if (isEndOfInput()) {
110 return null;
111 }
112
113 final int availableBytes = in.available();
114 final int chunkSize;
115 if (availableBytes <= 0) {
116 chunkSize = this.chunkSize;
117 } else {
118 chunkSize = Math.min(this.chunkSize, in.available());
119 }
120
121 boolean release = true;
122 ByteBuf buffer = allocator.buffer(chunkSize);
123 try {
124
125 int written = buffer.writeBytes(in, chunkSize);
126 if (written < 0) {
127 return null;
128 }
129 offset += written;
130 release = false;
131 return buffer;
132 } finally {
133 if (release) {
134 buffer.release();
135 }
136 }
137 }
138
139 @Override
140 public long length() {
141 return -1;
142 }
143
144 @Override
145 public long progress() {
146 return offset;
147 }
148 }