查看本类的 API文档回源码主页即时通讯网 - 即时通讯开发者社区!
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *  
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *  
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License. 
18   *  
19   */
20  package org.apache.mina.filter.codec;
21  
22  import org.apache.mina.common.ByteBuffer;
23  import org.apache.mina.common.IoSession;
24  
25  /**
26   * A {@link ProtocolDecoder} that cumulates the content of received
27   * buffers to a <em>cumulative buffer</em> to help users implement decoders.
28   * <p>
29   * If the received {@link ByteBuffer} is only a part of a message.
30   * decoders should cumulate received buffers to make a message complete or
31   * to postpone decoding until more buffers arrive.
32   * <p>
33   * Here is an example decoder that decodes CRLF terminated lines into 
34   * <code>Command</code> objects:
35   * <pre>
36   * public class CRLFTerminatedCommandLineDecoder 
37   *         extends CumulativeProtocolDecoder {
38   * 
39   *     private Command parseCommand(ByteBuffer in) {
40   *         // Convert the bytes in the specified buffer to a 
41   *         // Command object.
42   *         ...
43   *     }
44   * 
45   *     protected boolean doDecode(IoSession session, ByteBuffer in,
46   *                                ProtocolDecoderOutput out) 
47   *             throws Exception {
48   * 
49   *         // Remember the initial position.
50   *         int start = in.position();
51   *        
52   *         // Now find the first CRLF in the buffer.
53   *         byte previous = 0;
54   *         while (in.hasRemaining()) {
55   *             byte current = in.get();
56   *            
57   *             if (previous == '\r' && current == '\n') {
58   *                 // Remember the current position and limit.
59   *                 int position = in.position();
60   *                 int limit = in.limit();
61   *                 try {
62   *                     in.position(start);
63   *                     in.limit(position);
64   *                     // The bytes between in.position() and in.limit()
65   *                     // now contain a full CRLF terminated line.
66   *                     out.write(parseCommand(in.slice()));
67   *                 } finally {
68   *                     // Set the position to point right after the
69   *                     // detected line and set the limit to the old
70   *                     // one.
71   *                     in.position(position);
72   *                     in.limit(limit);
73   *                 }
74   *                 // Decoded one line; CumulativeProtocolDecoder will  
75   *                 // call me again until I return false. So just 
76   *                 // return true until there are no more lines in the 
77   *                 // buffer.
78   *                 return true;
79   *             }
80   *            
81   *             previous = current;
82   *         }
83   *         
84   *         // Could not find CRLF in the buffer. Reset the initial 
85   *         // position to the one we recorded above.
86   *         in.position(start);
87   *        
88   *         return false;
89   *     }
90   * }
91   * </pre>
92   * 
93   * @author The Apache Directory Project (mina-dev@directory.apache.org)
94   * @version $Rev: 555855 $, $Date: 2007-07-13 12:19:00 +0900 (Fri, 13 Jul 2007) $
95   */
96  public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
97  
98      private static final String BUFFER = CumulativeProtocolDecoder.class
99              .getName()
100             + ".Buffer";
101 
102     /**
103      * Creates a new instance.
104      */
105     protected CumulativeProtocolDecoder() {
106     }
107 
108     /**
109      * Cumulates content of <tt>in</tt> into internal buffer and forwards
110      * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
111      * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
112      * and the cumulative buffer is compacted after decoding ends.
113      * 
114      * @throws IllegalStateException if your <tt>doDecode()</tt> returned
115      *                               <tt>true</tt> not consuming the cumulative buffer.
116      */
117     public void decode(IoSession session, ByteBuffer in,
118             ProtocolDecoderOutput out) throws Exception {
119         boolean usingSessionBuffer = true;
120         ByteBuffer buf = (ByteBuffer) session.getAttribute(BUFFER);
121         // If we have a session buffer, append data to that; otherwise
122         // use the buffer read from the network directly.
123         if (buf != null) {
124             buf.put(in);
125             buf.flip();
126         } else {
127             buf = in;
128             usingSessionBuffer = false;
129         }
130 
131         for (;;) {
132             int oldPos = buf.position();
133             boolean decoded = doDecode(session, buf, out);
134             if (decoded) {
135                 if (buf.position() == oldPos) {
136                     throw new IllegalStateException(
137                             "doDecode() can't return true when buffer is not consumed.");
138                 }
139 
140                 if (!buf.hasRemaining()) {
141                     break;
142                 }
143             } else {
144                 break;
145             }
146         }
147 
148         // if there is any data left that cannot be decoded, we store
149         // it in a buffer in the session and next time this decoder is
150         // invoked the session buffer gets appended to
151         if (buf.hasRemaining()) {
152             if (usingSessionBuffer)
153                 buf.compact();
154             else
155                 storeRemainingInSession(buf, session);
156         } else {
157             if (usingSessionBuffer)
158                 removeSessionBuffer(session);
159         }
160     }
161 
162     /**
163      * Implement this method to consume the specified cumulative buffer and
164      * decode its content into message(s). 
165      *  
166      * @param in the cumulative buffer
167      * @return <tt>true</tt> if and only if there's more to decode in the buffer
168      *         and you want to have <tt>doDecode</tt> method invoked again.
169      *         Return <tt>false</tt> if remaining data is not enough to decode,
170      *         then this method will be invoked again when more data is cumulated.
171      * @throws Exception if cannot decode <tt>in</tt>.
172      */
173     protected abstract boolean doDecode(IoSession session, ByteBuffer in,
174             ProtocolDecoderOutput out) throws Exception;
175 
176     /**
177      * Releases the cumulative buffer used by the specified <tt>session</tt>.
178      * Please don't forget to call <tt>super.dispose( session )</tt> when
179      * you override this method.
180      */
181     public void dispose(IoSession session) throws Exception {
182         removeSessionBuffer(session);
183     }
184 
185     private void removeSessionBuffer(IoSession session) {
186         ByteBuffer buf = (ByteBuffer) session.removeAttribute(BUFFER);
187         if (buf != null) {
188             buf.release();
189         }
190     }
191 
192     private void storeRemainingInSession(ByteBuffer buf, IoSession session) {
193         ByteBuffer remainingBuf = ByteBuffer.allocate(buf.capacity());
194         remainingBuf.setAutoExpand(true);
195         remainingBuf.order(buf.order());
196         remainingBuf.put(buf);
197         session.setAttribute(BUFFER, remainingBuf);
198     }
199 }