查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.http;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
20  
21  import java.util.List;
22  import java.util.Map.Entry;
23  
24  import org.jboss.netty.buffer.ChannelBuffer;
25  import org.jboss.netty.buffer.ChannelBuffers;
26  import org.jboss.netty.buffer.CompositeChannelBuffer;
27  import org.jboss.netty.channel.ChannelHandler;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.channel.ChannelPipeline;
30  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
33  import org.jboss.netty.handler.codec.frame.TooLongFrameException;
34  import org.jboss.netty.util.CharsetUtil;
35  
36  /**
37   * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
38   * and its following {@link HttpChunk}s into a single {@link HttpMessage} with
39   * no following {@link HttpChunk}s.  It is useful when you don't want to take
40   * care of HTTP messages whose transfer encoding is 'chunked'.  Insert this
41   * handler after {@link HttpMessageDecoder} in the {@link ChannelPipeline}:
42   * <pre>
43   * {@link ChannelPipeline} p = ...;
44   * ...
45   * p.addLast("decoder", new {@link HttpRequestDecoder}());
46   * p.addLast("aggregator", <b>new {@link HttpChunkAggregator}(1048576)</b>);
47   * ...
48   * p.addLast("encoder", new {@link HttpResponseEncoder}());
49   * p.addLast("handler", new HttpRequestHandler());
50   * </pre>
51   * @apiviz.landmark
52   * @apiviz.has org.jboss.netty.handler.codec.http.HttpChunk oneway - - filters out
53   */
54  public class HttpChunkAggregator extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler {
55      public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024;
56  
57      private static final ChannelBuffer CONTINUE = ChannelBuffers.copiedBuffer(
58              "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII);
59  
60      private final int maxContentLength;
61      private HttpMessage currentMessage;
62      private boolean tooLongFrameFound;
63      private ChannelHandlerContext ctx;
64  
65      private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS;
66  
67      /**
68       * Creates a new instance.
69       *
70       * @param maxContentLength
71       *        the maximum length of the aggregated content.
72       *        If the length of the aggregated content exceeds this value,
73       *        a {@link TooLongFrameException} will be raised.
74       */
75      public HttpChunkAggregator(int maxContentLength) {
76          if (maxContentLength <= 0) {
77              throw new IllegalArgumentException(
78                      "maxContentLength must be a positive integer: " +
79                      maxContentLength);
80          }
81          this.maxContentLength = maxContentLength;
82      }
83  
84      /**
85       * Returns the maximum number of components in the cumulation buffer.  If the number of
86       * the components in the cumulation buffer exceeds this value, the components of the
87       * cumulation buffer are consolidated into a single component, involving memory copies.
88       * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}.
89       */
90      public final int getMaxCumulationBufferComponents() {
91          return maxCumulationBufferComponents;
92      }
93  
94      /**
95       * Sets the maximum number of components in the cumulation buffer.  If the number of
96       * the components in the cumulation buffer exceeds this value, the components of the
97       * cumulation buffer are consolidated into a single component, involving memory copies.
98       * The default value of this property is {@link #DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS}
99       * and its minimum allowed value is {@code 2}.
100      */
101     public final void setMaxCumulationBufferComponents(int maxCumulationBufferComponents) {
102         if (maxCumulationBufferComponents < 2) {
103             throw new IllegalArgumentException(
104                     "maxCumulationBufferComponents: " + maxCumulationBufferComponents +
105                     " (expected: >= 2)");
106         }
107 
108         if (ctx == null) {
109             this.maxCumulationBufferComponents = maxCumulationBufferComponents;
110         } else {
111             throw new IllegalStateException(
112                     "decoder properties cannot be changed once the decoder is added to a pipeline.");
113         }
114     }
115 
116     @Override
117     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
118             throws Exception {
119 
120         Object msg = e.getMessage();
121         HttpMessage currentMessage = this.currentMessage;
122 
123         if (msg instanceof HttpMessage) {
124             HttpMessage m = (HttpMessage) msg;
125             tooLongFrameFound = false;
126 
127             // Handle the 'Expect: 100-continue' header if necessary.
128             // TODO: Respond with 413 Request Entity Too Large
129             //   and discard the traffic or close the connection.
130             //       No need to notify the upstream handlers - just log.
131             //       If decoding a response, just throw an exception.
132             if (is100ContinueExpected(m)) {
133                 write(ctx, succeededFuture(ctx.getChannel()), CONTINUE.duplicate());
134             }
135 
136             if (m.isChunked()) {
137                 // A chunked message - remove 'Transfer-Encoding' header,
138                 // initialize the cumulative buffer, and wait for incoming chunks.
139                 HttpCodecUtil.removeTransferEncodingChunked(m);
140                 m.setChunked(false);
141                 this.currentMessage = m;
142             } else {
143                 // Not a chunked message - pass through.
144                 this.currentMessage = null;
145                 ctx.sendUpstream(e);
146             }
147         } else if (msg instanceof HttpChunk) {
148             // Sanity check
149             if (currentMessage == null) {
150                 throw new IllegalStateException(
151                         "received " + HttpChunk.class.getSimpleName() +
152                         " without " + HttpMessage.class.getSimpleName());
153             }
154             HttpChunk chunk = (HttpChunk) msg;
155 
156             if (tooLongFrameFound) {
157                 if (chunk.isLast()) {
158                     this.currentMessage = null;
159                 }
160                 return;
161             }
162 
163             // Merge the received chunk into the content of the current message.
164             ChannelBuffer content = currentMessage.getContent();
165 
166             if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) {
167                 tooLongFrameFound = true;
168 
169                 throw new TooLongFrameException(
170                         "HTTP content length exceeded " + maxContentLength +
171                         " bytes.");
172             }
173 
174             // Append the content of the chunk
175             appendToCumulation(chunk.getContent());
176 
177             if (chunk.isLast()) {
178                 this.currentMessage = null;
179 
180                 // Merge trailing headers into the message.
181                 if (chunk instanceof HttpChunkTrailer) {
182                     HttpChunkTrailer trailer = (HttpChunkTrailer) chunk;
183                     for (Entry<String, String> header: trailer.trailingHeaders()) {
184                         currentMessage.headers().set(header.getKey(), header.getValue());
185                     }
186                 }
187 
188                 // Set the 'Content-Length' header.
189                 currentMessage.headers().set(
190                         HttpHeaders.Names.CONTENT_LENGTH,
191                         String.valueOf(content.readableBytes()));
192 
193                 // All done - generate the event.
194                 fireMessageReceived(ctx, currentMessage, e.getRemoteAddress());
195             }
196         } else {
197             // Neither HttpMessage or HttpChunk
198             ctx.sendUpstream(e);
199         }
200     }
201 
202     protected void appendToCumulation(ChannelBuffer input) {
203         ChannelBuffer cumulation = currentMessage.getContent();
204         if (cumulation instanceof CompositeChannelBuffer) {
205             // Make sure the resulting cumulation buffer has no more than the configured components.
206             CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
207             if (composite.numComponents() >= maxCumulationBufferComponents) {
208                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(composite.copy(), input));
209             } else {
210                 List<ChannelBuffer> decomposed = composite.decompose(0, composite.readableBytes());
211                 ChannelBuffer[] buffers = decomposed.toArray(new ChannelBuffer[decomposed.size() + 1]);
212                 buffers[buffers.length - 1] = input;
213 
214                 currentMessage.setContent(ChannelBuffers.wrappedBuffer(buffers));
215             }
216         } else {
217             currentMessage.setContent(ChannelBuffers.wrappedBuffer(cumulation, input));
218         }
219     }
220 
221     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
222         this.ctx = ctx;
223     }
224 
225     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
226         // noop
227     }
228 
229     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
230         // noop
231     }
232 
233     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
234         // noop
235     }
236 }