1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.stomp;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.Unpooled;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.DecoderResult;
23 import io.netty.handler.codec.ReplayingDecoder;
24 import io.netty.handler.codec.TooLongFrameException;
25 import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26 import io.netty.util.ByteProcessor;
27 import io.netty.util.internal.AppendableCharSequence;
28 import io.netty.util.internal.StringUtil;
29 import io.netty.util.internal.UnstableApi;
30
31 import java.util.List;
32
33 import static io.netty.buffer.ByteBufUtil.*;
34 import static io.netty.util.internal.ObjectUtil.*;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 public class StompSubframeDecoder extends ReplayingDecoder<State> {
55
56 private static final int DEFAULT_CHUNK_SIZE = 8132;
57 private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
58
59 @UnstableApi
60 public enum State {
61 SKIP_CONTROL_CHARACTERS,
62 READ_HEADERS,
63 READ_CONTENT,
64 FINALIZE_FRAME_READ,
65 BAD_FRAME,
66 INVALID_CHUNK
67 }
68
69 private final Utf8LineParser commandParser;
70 private final HeaderParser headerParser;
71 private final int maxChunkSize;
72 private int alreadyReadChunkSize;
73 private LastStompContentSubframe lastContent;
74 private long contentLength = -1;
75
76 public StompSubframeDecoder() {
77 this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
78 }
79
80 public StompSubframeDecoder(boolean validateHeaders) {
81 this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
82 }
83
84 public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
85 this(maxLineLength, maxChunkSize, false);
86 }
87
88 public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
89 super(State.SKIP_CONTROL_CHARACTERS);
90 checkPositive(maxLineLength, "maxLineLength");
91 checkPositive(maxChunkSize, "maxChunkSize");
92 this.maxChunkSize = maxChunkSize;
93 commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
94 headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
95 }
96
97 @Override
98 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
99 switch (state()) {
100 case SKIP_CONTROL_CHARACTERS:
101 skipControlCharacters(in);
102 checkpoint(State.READ_HEADERS);
103
104 case READ_HEADERS:
105 StompCommand command = StompCommand.UNKNOWN;
106 StompHeadersSubframe frame = null;
107 try {
108 command = readCommand(in);
109 frame = new DefaultStompHeadersSubframe(command);
110 checkpoint(readHeaders(in, frame));
111 out.add(frame);
112 } catch (Exception e) {
113 if (frame == null) {
114 frame = new DefaultStompHeadersSubframe(command);
115 }
116 frame.setDecoderResult(DecoderResult.failure(e));
117 out.add(frame);
118 checkpoint(State.BAD_FRAME);
119 return;
120 }
121 break;
122 case BAD_FRAME:
123 in.skipBytes(actualReadableBytes());
124 return;
125 }
126 try {
127 switch (state()) {
128 case READ_CONTENT:
129 int toRead = in.readableBytes();
130 if (toRead == 0) {
131 return;
132 }
133 if (toRead > maxChunkSize) {
134 toRead = maxChunkSize;
135 }
136 if (contentLength >= 0) {
137 int remainingLength = (int) (contentLength - alreadyReadChunkSize);
138 if (toRead > remainingLength) {
139 toRead = remainingLength;
140 }
141 ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
142 if ((alreadyReadChunkSize += toRead) >= contentLength) {
143 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
144 checkpoint(State.FINALIZE_FRAME_READ);
145 } else {
146 out.add(new DefaultStompContentSubframe(chunkBuffer));
147 return;
148 }
149 } else {
150 int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
151 if (nulIndex == in.readerIndex()) {
152 checkpoint(State.FINALIZE_FRAME_READ);
153 } else {
154 if (nulIndex > 0) {
155 toRead = nulIndex - in.readerIndex();
156 } else {
157 toRead = in.writerIndex() - in.readerIndex();
158 }
159 ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
160 alreadyReadChunkSize += toRead;
161 if (nulIndex > 0) {
162 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
163 checkpoint(State.FINALIZE_FRAME_READ);
164 } else {
165 out.add(new DefaultStompContentSubframe(chunkBuffer));
166 return;
167 }
168 }
169 }
170
171 case FINALIZE_FRAME_READ:
172 skipNullCharacter(in);
173 if (lastContent == null) {
174 lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
175 }
176 out.add(lastContent);
177 resetDecoder();
178 }
179 } catch (Exception e) {
180 if (lastContent != null) {
181 lastContent.release();
182 lastContent = null;
183 }
184
185 StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
186 errorContent.setDecoderResult(DecoderResult.failure(e));
187 out.add(errorContent);
188 checkpoint(State.BAD_FRAME);
189 }
190 }
191
192 private StompCommand readCommand(ByteBuf in) {
193 CharSequence commandSequence = commandParser.parse(in);
194 if (commandSequence == null) {
195 throw new DecoderException("Failed to read command from channel");
196 }
197 String commandStr = commandSequence.toString();
198 try {
199 return StompCommand.valueOf(commandStr);
200 } catch (IllegalArgumentException iae) {
201 throw new DecoderException("Cannot to parse command " + commandStr);
202 }
203 }
204
205 private State readHeaders(ByteBuf buffer, StompHeadersSubframe headersSubframe) {
206 StompHeaders headers = headersSubframe.headers();
207 for (;;) {
208 boolean headerRead = headerParser.parseHeader(headersSubframe, buffer);
209 if (!headerRead) {
210 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
211 contentLength = getContentLength(headers);
212 if (contentLength == 0) {
213 return State.FINALIZE_FRAME_READ;
214 }
215 }
216 return State.READ_CONTENT;
217 }
218 }
219 }
220
221 private static long getContentLength(StompHeaders headers) {
222 long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
223 if (contentLength < 0) {
224 throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
225 }
226 return contentLength;
227 }
228
229 private static void skipNullCharacter(ByteBuf buffer) {
230 byte b = buffer.readByte();
231 if (b != StompConstants.NUL) {
232 throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
233 }
234 }
235
236 private static void skipControlCharacters(ByteBuf buffer) {
237 byte b;
238 for (;;) {
239 b = buffer.readByte();
240 if (b != StompConstants.CR && b != StompConstants.LF) {
241 buffer.readerIndex(buffer.readerIndex() - 1);
242 break;
243 }
244 }
245 }
246
247 private void resetDecoder() {
248 checkpoint(State.SKIP_CONTROL_CHARACTERS);
249 contentLength = -1;
250 alreadyReadChunkSize = 0;
251 lastContent = null;
252 }
253
254 private static class Utf8LineParser implements ByteProcessor {
255
256 private final AppendableCharSequence charSeq;
257 private final int maxLineLength;
258
259 private int lineLength;
260 private char interim;
261 private boolean nextRead;
262
263 Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
264 this.charSeq = checkNotNull(charSeq, "charSeq");
265 this.maxLineLength = maxLineLength;
266 }
267
268 AppendableCharSequence parse(ByteBuf byteBuf) {
269 reset();
270 int offset = byteBuf.forEachByte(this);
271 if (offset == -1) {
272 return null;
273 }
274
275 byteBuf.readerIndex(offset + 1);
276 return charSeq;
277 }
278
279 AppendableCharSequence charSequence() {
280 return charSeq;
281 }
282
283 @Override
284 public boolean process(byte nextByte) throws Exception {
285 if (nextByte == StompConstants.CR) {
286 ++lineLength;
287 return true;
288 }
289
290 if (nextByte == StompConstants.LF) {
291 return false;
292 }
293
294 if (++lineLength > maxLineLength) {
295 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
296 }
297
298
299
300
301 if (nextRead) {
302 interim |= (nextByte & 0x3F) << 6;
303 nextRead = false;
304 } else if (interim != 0) {
305 appendTo(charSeq, (char) (interim | (nextByte & 0x3F)));
306 interim = 0;
307 } else if (nextByte >= 0) {
308
309 appendTo(charSeq, (char) nextByte);
310 } else if ((nextByte & 0xE0) == 0xC0) {
311
312
313 interim = (char) ((nextByte & 0x1F) << 6);
314 } else {
315
316 interim = (char) ((nextByte & 0x0F) << 12);
317 nextRead = true;
318 }
319
320 return true;
321 }
322
323 protected void appendTo(AppendableCharSequence charSeq, char chr) {
324 charSeq.append(chr);
325 }
326
327 protected void reset() {
328 charSeq.reset();
329 lineLength = 0;
330 interim = 0;
331 nextRead = false;
332 }
333 }
334
335 private static final class HeaderParser extends Utf8LineParser {
336
337 private final boolean validateHeaders;
338
339 private String name;
340 private boolean valid;
341
342 private boolean shouldUnescape;
343 private boolean unescapeInProgress;
344
345 HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
346 super(charSeq, maxLineLength);
347 this.validateHeaders = validateHeaders;
348 }
349
350 boolean parseHeader(StompHeadersSubframe headersSubframe, ByteBuf buf) {
351 shouldUnescape = shouldUnescape(headersSubframe.command());
352 AppendableCharSequence value = super.parse(buf);
353 if (value == null || (name == null && value.length() == 0)) {
354 return false;
355 }
356
357 if (valid) {
358 headersSubframe.headers().add(name, value.toString());
359 } else if (validateHeaders) {
360 if (StringUtil.isNullOrEmpty(name)) {
361 throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
362 }
363 String line = name + ':' + value;
364 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
365 + ", " + line);
366 }
367 return true;
368 }
369
370 @Override
371 public boolean process(byte nextByte) throws Exception {
372 if (nextByte == StompConstants.COLON) {
373 if (name == null) {
374 AppendableCharSequence charSeq = charSequence();
375 if (charSeq.length() != 0) {
376 name = charSeq.substring(0, charSeq.length());
377 charSeq.reset();
378 valid = true;
379 return true;
380 } else {
381 name = StringUtil.EMPTY_STRING;
382 }
383 } else {
384 valid = false;
385 }
386 }
387
388 return super.process(nextByte);
389 }
390
391 @Override
392 protected void appendTo(AppendableCharSequence charSeq, char chr) {
393 if (!shouldUnescape) {
394 super.appendTo(charSeq, chr);
395 return;
396 }
397
398 if (chr == '\\') {
399 if (unescapeInProgress) {
400 super.appendTo(charSeq, chr);
401 unescapeInProgress = false;
402 } else {
403 unescapeInProgress = true;
404 }
405 return;
406 }
407
408 if (unescapeInProgress) {
409 if (chr == 'c') {
410 charSeq.append(':');
411 } else if (chr == 'r') {
412 charSeq.append('\r');
413 } else if (chr == 'n') {
414 charSeq.append('\n');
415 } else {
416 charSeq.append('\\').append(chr);
417 throw new IllegalArgumentException("received an invalid escape header sequence '" + charSeq + '\'');
418 }
419
420 unescapeInProgress = false;
421 return;
422 }
423
424 super.appendTo(charSeq, chr);
425 }
426
427 @Override
428 protected void reset() {
429 name = null;
430 valid = false;
431 unescapeInProgress = false;
432 super.reset();
433 }
434
435 private static boolean shouldUnescape(StompCommand command) {
436 return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
437 }
438 }
439 }