查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.stomp;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.ChannelHandlerContext;
21  import io.netty.handler.codec.DecoderException;
22  import io.netty.handler.codec.DecoderResult;
23  import io.netty.handler.codec.ReplayingDecoder;
24  import io.netty.handler.codec.TooLongFrameException;
25  import io.netty.handler.codec.stomp.StompSubframeDecoder.State;
26  import io.netty.util.ByteProcessor;
27  import io.netty.util.internal.AppendableCharSequence;
28  import io.netty.util.internal.StringUtil;
29  import io.netty.util.internal.UnstableApi;
30  
31  import java.util.List;
32  
33  import static io.netty.buffer.ByteBufUtil.*;
34  import static io.netty.util.internal.ObjectUtil.*;
35  
36  /**
37   * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and {@link StompContentSubframe}s.
38   *
39   * <h3>Parameters to control memory consumption: </h3>
40   * {@code maxLineLength} the maximum length of line - restricts length of command and header lines If the length of the
41   * initial line exceeds this value, a {@link TooLongFrameException} will be raised.
42   * <br>
43   * {@code maxChunkSize} The maximum length of the content or each chunk.  If the content length (or the length of each
44   * chunk) exceeds this value, the content or chunk ill be split into multiple {@link StompContentSubframe}s whose length
45   * is {@code maxChunkSize} at maximum.
46   *
47   * <h3>Chunked Content</h3>
48   * <p>
49   * If the content of a stomp message is greater than {@code maxChunkSize} the transfer encoding of the HTTP message is
50   * 'chunked', this decoder generates multiple {@link StompContentSubframe} instances to avoid excessive memory
51   * consumption. Note, that every message, even with no content decodes with {@link LastStompContentSubframe} at the end
52   * to simplify upstream message parsing.
53   */
54  public class StompSubframeDecoder extends ReplayingDecoder<State> {
55  
56      private static final int DEFAULT_CHUNK_SIZE = 8132;
57      private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
58  
59      @UnstableApi
60      public enum State {
61          SKIP_CONTROL_CHARACTERS,
62          READ_HEADERS,
63          READ_CONTENT,
64          FINALIZE_FRAME_READ,
65          BAD_FRAME,
66          INVALID_CHUNK
67      }
68  
69      private final Utf8LineParser commandParser;
70      private final HeaderParser headerParser;
71      private final int maxChunkSize;
72      private int alreadyReadChunkSize;
73      private LastStompContentSubframe lastContent;
74      private long contentLength = -1;
75  
76      public StompSubframeDecoder() {
77          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
78      }
79  
80      public StompSubframeDecoder(boolean validateHeaders) {
81          this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE, validateHeaders);
82      }
83  
84      public StompSubframeDecoder(int maxLineLength, int maxChunkSize) {
85          this(maxLineLength, maxChunkSize, false);
86      }
87  
88      public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
89          super(State.SKIP_CONTROL_CHARACTERS);
90          checkPositive(maxLineLength, "maxLineLength");
91          checkPositive(maxChunkSize, "maxChunkSize");
92          this.maxChunkSize = maxChunkSize;
93          commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
94          headerParser = new HeaderParser(new AppendableCharSequence(128), maxLineLength, validateHeaders);
95      }
96  
97      @Override
98      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
99          switch (state()) {
100             case SKIP_CONTROL_CHARACTERS:
101                 skipControlCharacters(in);
102                 checkpoint(State.READ_HEADERS);
103                 // Fall through.
104             case READ_HEADERS:
105                 StompCommand command = StompCommand.UNKNOWN;
106                 StompHeadersSubframe frame = null;
107                 try {
108                     command = readCommand(in);
109                     frame = new DefaultStompHeadersSubframe(command);
110                     checkpoint(readHeaders(in, frame));
111                     out.add(frame);
112                 } catch (Exception e) {
113                     if (frame == null) {
114                         frame = new DefaultStompHeadersSubframe(command);
115                     }
116                     frame.setDecoderResult(DecoderResult.failure(e));
117                     out.add(frame);
118                     checkpoint(State.BAD_FRAME);
119                     return;
120                 }
121                 break;
122             case BAD_FRAME:
123                 in.skipBytes(actualReadableBytes());
124                 return;
125         }
126         try {
127             switch (state()) {
128                 case READ_CONTENT:
129                     int toRead = in.readableBytes();
130                     if (toRead == 0) {
131                         return;
132                     }
133                     if (toRead > maxChunkSize) {
134                         toRead = maxChunkSize;
135                     }
136                     if (contentLength >= 0) {
137                         int remainingLength = (int) (contentLength - alreadyReadChunkSize);
138                         if (toRead > remainingLength) {
139                             toRead = remainingLength;
140                         }
141                         ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
142                         if ((alreadyReadChunkSize += toRead) >= contentLength) {
143                             lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
144                             checkpoint(State.FINALIZE_FRAME_READ);
145                         } else {
146                             out.add(new DefaultStompContentSubframe(chunkBuffer));
147                             return;
148                         }
149                     } else {
150                         int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
151                         if (nulIndex == in.readerIndex()) {
152                             checkpoint(State.FINALIZE_FRAME_READ);
153                         } else {
154                             if (nulIndex > 0) {
155                                 toRead = nulIndex - in.readerIndex();
156                             } else {
157                                 toRead = in.writerIndex() - in.readerIndex();
158                             }
159                             ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
160                             alreadyReadChunkSize += toRead;
161                             if (nulIndex > 0) {
162                                 lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
163                                 checkpoint(State.FINALIZE_FRAME_READ);
164                             } else {
165                                 out.add(new DefaultStompContentSubframe(chunkBuffer));
166                                 return;
167                             }
168                         }
169                     }
170                     // Fall through.
171                 case FINALIZE_FRAME_READ:
172                     skipNullCharacter(in);
173                     if (lastContent == null) {
174                         lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
175                     }
176                     out.add(lastContent);
177                     resetDecoder();
178             }
179         } catch (Exception e) {
180             if (lastContent != null) {
181                 lastContent.release();
182                 lastContent = null;
183             }
184 
185             StompContentSubframe errorContent = new DefaultLastStompContentSubframe(Unpooled.EMPTY_BUFFER);
186             errorContent.setDecoderResult(DecoderResult.failure(e));
187             out.add(errorContent);
188             checkpoint(State.BAD_FRAME);
189         }
190     }
191 
192     private StompCommand readCommand(ByteBuf in) {
193         CharSequence commandSequence = commandParser.parse(in);
194         if (commandSequence == null) {
195             throw new DecoderException("Failed to read command from channel");
196         }
197         String commandStr = commandSequence.toString();
198         try {
199             return StompCommand.valueOf(commandStr);
200         } catch (IllegalArgumentException iae) {
201             throw new DecoderException("Cannot to parse command " + commandStr);
202         }
203     }
204 
205     private State readHeaders(ByteBuf buffer, StompHeadersSubframe headersSubframe) {
206         StompHeaders headers = headersSubframe.headers();
207         for (;;) {
208             boolean headerRead = headerParser.parseHeader(headersSubframe, buffer);
209             if (!headerRead) {
210                 if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
211                     contentLength = getContentLength(headers);
212                     if (contentLength == 0) {
213                         return State.FINALIZE_FRAME_READ;
214                     }
215                 }
216                 return State.READ_CONTENT;
217             }
218         }
219     }
220 
221     private static long getContentLength(StompHeaders headers) {
222         long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, 0L);
223         if (contentLength < 0) {
224             throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
225         }
226         return contentLength;
227     }
228 
229     private static void skipNullCharacter(ByteBuf buffer) {
230         byte b = buffer.readByte();
231         if (b != StompConstants.NUL) {
232             throw new IllegalStateException("unexpected byte in buffer " + b + " while expecting NULL byte");
233         }
234     }
235 
236     private static void skipControlCharacters(ByteBuf buffer) {
237         byte b;
238         for (;;) {
239             b = buffer.readByte();
240             if (b != StompConstants.CR && b != StompConstants.LF) {
241                 buffer.readerIndex(buffer.readerIndex() - 1);
242                 break;
243             }
244         }
245     }
246 
247     private void resetDecoder() {
248         checkpoint(State.SKIP_CONTROL_CHARACTERS);
249         contentLength = -1;
250         alreadyReadChunkSize = 0;
251         lastContent = null;
252     }
253 
254     private static class Utf8LineParser implements ByteProcessor {
255 
256         private final AppendableCharSequence charSeq;
257         private final int maxLineLength;
258 
259         private int lineLength;
260         private char interim;
261         private boolean nextRead;
262 
263         Utf8LineParser(AppendableCharSequence charSeq, int maxLineLength) {
264             this.charSeq = checkNotNull(charSeq, "charSeq");
265             this.maxLineLength = maxLineLength;
266         }
267 
268         AppendableCharSequence parse(ByteBuf byteBuf) {
269             reset();
270             int offset = byteBuf.forEachByte(this);
271             if (offset == -1) {
272                 return null;
273             }
274 
275             byteBuf.readerIndex(offset + 1);
276             return charSeq;
277         }
278 
279         AppendableCharSequence charSequence() {
280             return charSeq;
281         }
282 
283         @Override
284         public boolean process(byte nextByte) throws Exception {
285             if (nextByte == StompConstants.CR) {
286                 ++lineLength;
287                 return true;
288             }
289 
290             if (nextByte == StompConstants.LF) {
291                 return false;
292             }
293 
294             if (++lineLength > maxLineLength) {
295                 throw new TooLongFrameException("An STOMP line is larger than " + maxLineLength + " bytes.");
296             }
297 
298             // 1 byte   -   0xxxxxxx                    -  7 bits
299             // 2 byte   -   110xxxxx 10xxxxxx           -  11 bits
300             // 3 byte   -   1110xxxx 10xxxxxx 10xxxxxx  -  16 bits
301             if (nextRead) {
302                 interim |= (nextByte & 0x3F) << 6;
303                 nextRead = false;
304             } else if (interim != 0) { // flush 2 or 3 byte
305                 appendTo(charSeq, (char) (interim | (nextByte & 0x3F)));
306                 interim = 0;
307             } else if (nextByte >= 0) { // INITIAL BRANCH
308                 // The first 128 characters (US-ASCII) need one byte.
309                 appendTo(charSeq, (char) nextByte);
310             } else if ((nextByte & 0xE0) == 0xC0) {
311                 // The next 1920 characters need two bytes and we can define
312                 // a first byte by mask 110xxxxx.
313                 interim = (char) ((nextByte & 0x1F) << 6);
314             } else {
315                 // The rest of characters need three bytes.
316                 interim = (char) ((nextByte & 0x0F) << 12);
317                 nextRead = true;
318             }
319 
320             return true;
321         }
322 
323         protected void appendTo(AppendableCharSequence charSeq, char chr) {
324             charSeq.append(chr);
325         }
326 
327         protected void reset() {
328             charSeq.reset();
329             lineLength = 0;
330             interim = 0;
331             nextRead = false;
332         }
333     }
334 
335     private static final class HeaderParser extends Utf8LineParser {
336 
337         private final boolean validateHeaders;
338 
339         private String name;
340         private boolean valid;
341 
342         private boolean shouldUnescape;
343         private boolean unescapeInProgress;
344 
345         HeaderParser(AppendableCharSequence charSeq, int maxLineLength, boolean validateHeaders) {
346             super(charSeq, maxLineLength);
347             this.validateHeaders = validateHeaders;
348         }
349 
350         boolean parseHeader(StompHeadersSubframe headersSubframe, ByteBuf buf) {
351             shouldUnescape = shouldUnescape(headersSubframe.command());
352             AppendableCharSequence value = super.parse(buf);
353             if (value == null || (name == null && value.length() == 0)) {
354                 return false;
355             }
356 
357             if (valid) {
358                 headersSubframe.headers().add(name, value.toString());
359             } else if (validateHeaders) {
360                 if (StringUtil.isNullOrEmpty(name)) {
361                     throw new IllegalArgumentException("received an invalid header line '" + value + '\'');
362                 }
363                 String line = name + ':' + value;
364                 throw new IllegalArgumentException("a header value or name contains a prohibited character ':'"
365                                                    + ", " + line);
366             }
367             return true;
368         }
369 
370         @Override
371         public boolean process(byte nextByte) throws Exception {
372             if (nextByte == StompConstants.COLON) {
373                 if (name == null) {
374                     AppendableCharSequence charSeq = charSequence();
375                     if (charSeq.length() != 0) {
376                         name = charSeq.substring(0, charSeq.length());
377                         charSeq.reset();
378                         valid = true;
379                         return true;
380                     } else {
381                         name = StringUtil.EMPTY_STRING;
382                     }
383                 } else {
384                     valid = false;
385                 }
386             }
387 
388             return super.process(nextByte);
389         }
390 
391         @Override
392         protected void appendTo(AppendableCharSequence charSeq, char chr) {
393             if (!shouldUnescape) {
394                 super.appendTo(charSeq, chr);
395                 return;
396             }
397 
398             if (chr == '\\') {
399                 if (unescapeInProgress) {
400                     super.appendTo(charSeq, chr);
401                     unescapeInProgress = false;
402                 } else {
403                     unescapeInProgress = true;
404                 }
405                 return;
406             }
407 
408             if (unescapeInProgress) {
409                 if (chr == 'c') {
410                     charSeq.append(':');
411                 } else if (chr == 'r') {
412                     charSeq.append('\r');
413                 } else if (chr == 'n') {
414                     charSeq.append('\n');
415                 } else {
416                     charSeq.append('\\').append(chr);
417                     throw new IllegalArgumentException("received an invalid escape header sequence '" + charSeq + '\'');
418                 }
419 
420                 unescapeInProgress = false;
421                 return;
422             }
423 
424             super.appendTo(charSeq, chr);
425         }
426 
427         @Override
428         protected void reset() {
429             name = null;
430             valid = false;
431             unescapeInProgress = false;
432             super.reset();
433         }
434 
435         private static boolean shouldUnescape(StompCommand command) {
436             return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
437         }
438     }
439 }