1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.filter.codec;
21
22 import org.apache.mina.core.buffer.IoBuffer;
23 import org.apache.mina.core.service.TransportMetadata;
24 import org.apache.mina.core.session.AttributeKey;
25 import org.apache.mina.core.session.IoSession;
26
27 /**
28 * A {@link ProtocolDecoder} that cumulates the content of received buffers to a
29 * <em>cumulative buffer</em> to help users implement decoders.
30 * <p>
31 * If the received {@link IoBuffer} is only a part of a message. decoders should
32 * cumulate received buffers to make a message complete or to postpone decoding
33 * until more buffers arrive.
34 * <p>
35 * Here is an example decoder that decodes CRLF terminated lines into
36 * <code>Command</code> objects:
37 *
38 * <pre>
39 * public class CrLfTerminatedCommandLineDecoder extends CumulativeProtocolDecoder {
40 *
41 * private Command parseCommand(IoBuffer in) {
42 * // Convert the bytes in the specified buffer to a
43 * // Command object.
44 * ...
45 * }
46 *
47 * protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
48 *
49 * // Remember the initial position.
50 * int start = in.position();
51 *
52 * // Now find the first CRLF in the buffer.
53 * byte previous = 0;
54 * while (in.hasRemaining()) {
55 * byte current = in.get();
56 *
57 * if (previous == '\r' && current == '\n') {
58 * // Remember the current position and limit.
59 * int position = in.position();
60 * int limit = in.limit();
61 * try {
62 * in.position(start);
63 * in.limit(position);
64 * // The bytes between in.position() and in.limit()
65 * // now contain a full CRLF terminated line.
66 * out.write(parseCommand(in.slice()));
67 * } finally {
68 * // Set the position to point right after the
69 * // detected line and set the limit to the old
70 * // one.
71 * in.position(position);
72 * in.limit(limit);
73 * }
74 * // Decoded one line; CumulativeProtocolDecoder will
75 * // call me again until I return false. So just
76 * // return true until there are no more lines in the
77 * // buffer.
78 * return true;
79 * }
80 *
81 * previous = current;
82 * }
83 *
84 * // Could not find CRLF in the buffer. Reset the initial
85 * // position to the one we recorded above.
86 * in.position(start);
87 *
88 * return false;
89 * }
90 * }
91 * </pre>
92 * <p>
93 * Please note that this decoder simply forward the call to
94 * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)} if the
95 * underlying transport doesn't have a packet fragmentation. Whether the
96 * transport has fragmentation or not is determined by querying
97 * {@link TransportMetadata}.
98 *
99 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
100 */
101 public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
102
103 private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
104
105 /**
106 * Creates a new instance.
107 */
108 protected CumulativeProtocolDecoder() {
109 // Do nothing
110 }
111
112 /**
113 * Cumulates content of <tt>in</tt> into internal buffer and forwards
114 * decoding request to
115 * {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
116 * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
117 * and the cumulative buffer is compacted after decoding ends.
118 *
119 * @throws IllegalStateException
120 * if your <tt>doDecode()</tt> returned <tt>true</tt> not
121 * consuming the cumulative buffer.
122 */
123 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
124 if (!session.getTransportMetadata().hasFragmentation()) {
125 while (in.hasRemaining()) {
126 if (!doDecode(session, in, out)) {
127 break;
128 }
129 }
130
131 return;
132 }
133
134 boolean usingSessionBuffer = true;
135 IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
136 // If we have a session buffer, append data to that; otherwise
137 // use the buffer read from the network directly.
138 if (buf != null) {
139 boolean appended = false;
140 // Make sure that the buffer is auto-expanded.
141 if (buf.isAutoExpand()) {
142 try {
143 buf.put(in);
144 appended = true;
145 } catch (IllegalStateException e) {
146 // A user called derivation method (e.g. slice()),
147 // which disables auto-expansion of the parent buffer.
148 } catch (IndexOutOfBoundsException e) {
149 // A user disabled auto-expansion.
150 }
151 }
152
153 if (appended) {
154 buf.flip();
155 } else {
156 // Reallocate the buffer if append operation failed due to
157 // derivation or disabled auto-expansion.
158 buf.flip();
159 IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
160 newBuf.order(buf.order());
161 newBuf.put(buf);
162 newBuf.put(in);
163 newBuf.flip();
164 buf = newBuf;
165
166 // Update the session attribute.
167 session.setAttribute(BUFFER, buf);
168 }
169 } else {
170 buf = in;
171 usingSessionBuffer = false;
172 }
173
174 for (;;) {
175 int oldPos = buf.position();
176 boolean decoded = doDecode(session, buf, out);
177 if (decoded) {
178 if (buf.position() == oldPos) {
179 throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
180 }
181
182 if (!buf.hasRemaining()) {
183 break;
184 }
185 } else {
186 break;
187 }
188 }
189
190 // if there is any data left that cannot be decoded, we store
191 // it in a buffer in the session and next time this decoder is
192 // invoked the session buffer gets appended to
193 if (buf.hasRemaining()) {
194 if (usingSessionBuffer && buf.isAutoExpand()) {
195 buf.compact();
196 } else {
197 storeRemainingInSession(buf, session);
198 }
199 } else {
200 if (usingSessionBuffer) {
201 removeSessionBuffer(session);
202 }
203 }
204 }
205
206 /**
207 * Implement this method to consume the specified cumulative buffer and
208 * decode its content into message(s).
209 *
210 * @param session The current Session
211 * @param in the cumulative buffer
212 * @param out The {@link ProtocolDecoderOutput} that will receive the decoded message
213 * @return <tt>true</tt> if and only if there's more to decode in the buffer
214 * and you want to have <tt>doDecode</tt> method invoked again.
215 * Return <tt>false</tt> if remaining data is not enough to decode,
216 * then this method will be invoked again when more data is
217 * cumulated.
218 * @throws Exception if cannot decode <tt>in</tt>.
219 */
220 protected abstract boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception;
221
222 /**
223 * Releases the cumulative buffer used by the specified <tt>session</tt>.
224 * Please don't forget to call <tt>super.dispose( session )</tt> when you
225 * override this method.
226 */
227 @Override
228 public void dispose(IoSession session) throws Exception {
229 removeSessionBuffer(session);
230 }
231
232 private void removeSessionBuffer(IoSession session) {
233 session.removeAttribute(BUFFER);
234 }
235
236 private void storeRemainingInSession(IoBuffer buf, IoSession session) {
237 final IoBuffer remainingBuf = IoBuffer.allocate(buf.capacity()).setAutoExpand(true);
238
239 remainingBuf.order(buf.order());
240 remainingBuf.put(buf);
241
242 session.setAttribute(BUFFER, remainingBuf);
243 }
244 }