1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.stomp;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufUtil;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.MessageToMessageEncoder;
22 import io.netty.util.concurrent.FastThreadLocal;
23 import io.netty.util.internal.AppendableCharSequence;
24
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map.Entry;
28
29 import static io.netty.handler.codec.stomp.StompConstants.NUL;
30 import static io.netty.handler.codec.stomp.StompHeaders.ACCEPT_VERSION;
31 import static io.netty.handler.codec.stomp.StompHeaders.ACK;
32 import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_LENGTH;
33 import static io.netty.handler.codec.stomp.StompHeaders.CONTENT_TYPE;
34 import static io.netty.handler.codec.stomp.StompHeaders.DESTINATION;
35 import static io.netty.handler.codec.stomp.StompHeaders.HEART_BEAT;
36 import static io.netty.handler.codec.stomp.StompHeaders.HOST;
37 import static io.netty.handler.codec.stomp.StompHeaders.ID;
38 import static io.netty.handler.codec.stomp.StompHeaders.LOGIN;
39 import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE;
40 import static io.netty.handler.codec.stomp.StompHeaders.MESSAGE_ID;
41 import static io.netty.handler.codec.stomp.StompHeaders.PASSCODE;
42 import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT;
43 import static io.netty.handler.codec.stomp.StompHeaders.RECEIPT_ID;
44 import static io.netty.handler.codec.stomp.StompHeaders.SERVER;
45 import static io.netty.handler.codec.stomp.StompHeaders.SESSION;
46 import static io.netty.handler.codec.stomp.StompHeaders.SUBSCRIPTION;
47 import static io.netty.handler.codec.stomp.StompHeaders.TRANSACTION;
48 import static io.netty.handler.codec.stomp.StompHeaders.VERSION;
49
50
51
52
53 public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
54
55 private static final int ESCAPE_HEADER_KEY_CACHE_LIMIT = 32;
56 private static final float DEFAULT_LOAD_FACTOR = 0.75f;
57 private static final FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>> ESCAPE_HEADER_KEY_CACHE =
58 new FastThreadLocal<LinkedHashMap<CharSequence, CharSequence>>() {
59 @Override
60 protected LinkedHashMap<CharSequence, CharSequence> initialValue() throws Exception {
61 LinkedHashMap<CharSequence, CharSequence> cache = new LinkedHashMap<CharSequence, CharSequence>(
62 ESCAPE_HEADER_KEY_CACHE_LIMIT, DEFAULT_LOAD_FACTOR, true) {
63
64 @Override
65 protected boolean removeEldestEntry(Entry eldest) {
66 return size() > ESCAPE_HEADER_KEY_CACHE_LIMIT;
67 }
68 };
69
70 cache.put(ACCEPT_VERSION, ACCEPT_VERSION);
71 cache.put(HOST, HOST);
72 cache.put(LOGIN, LOGIN);
73 cache.put(PASSCODE, PASSCODE);
74 cache.put(HEART_BEAT, HEART_BEAT);
75 cache.put(VERSION, VERSION);
76 cache.put(SESSION, SESSION);
77 cache.put(SERVER, SERVER);
78 cache.put(DESTINATION, DESTINATION);
79 cache.put(ID, ID);
80 cache.put(ACK, ACK);
81 cache.put(TRANSACTION, TRANSACTION);
82 cache.put(RECEIPT, RECEIPT);
83 cache.put(MESSAGE_ID, MESSAGE_ID);
84 cache.put(SUBSCRIPTION, SUBSCRIPTION);
85 cache.put(RECEIPT_ID, RECEIPT_ID);
86 cache.put(MESSAGE, MESSAGE);
87 cache.put(CONTENT_LENGTH, CONTENT_LENGTH);
88 cache.put(CONTENT_TYPE, CONTENT_TYPE);
89
90 return cache;
91 }
92 };
93
94 @Override
95 protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
96 if (msg instanceof StompFrame) {
97 StompFrame stompFrame = (StompFrame) msg;
98 ByteBuf buf = encodeFullFrame(stompFrame, ctx);
99
100 out.add(convertFullFrame(stompFrame, buf));
101 } else if (msg instanceof StompHeadersSubframe) {
102 StompHeadersSubframe stompHeadersSubframe = (StompHeadersSubframe) msg;
103 ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(stompHeadersSubframe));
104 encodeHeaders(stompHeadersSubframe, buf);
105
106 out.add(convertHeadersSubFrame(stompHeadersSubframe, buf));
107 } else if (msg instanceof StompContentSubframe) {
108 StompContentSubframe stompContentSubframe = (StompContentSubframe) msg;
109 ByteBuf buf = encodeContent(stompContentSubframe, ctx);
110
111 out.add(convertContentSubFrame(stompContentSubframe, buf));
112 }
113 }
114
115
116
117
118
119
120
121 protected Object convertFullFrame(StompFrame original, ByteBuf encoded) {
122 return encoded;
123 }
124
125
126
127
128
129
130
131 protected Object convertHeadersSubFrame(StompHeadersSubframe original, ByteBuf encoded) {
132 return encoded;
133 }
134
135
136
137
138
139
140
141 protected Object convertContentSubFrame(StompContentSubframe original, ByteBuf encoded) {
142 return encoded;
143 }
144
145
146
147
148
149 protected int headersSubFrameSize(StompHeadersSubframe headersSubframe) {
150 int estimatedSize = headersSubframe.headers().size() * 34 + 48;
151 if (estimatedSize < 128) {
152 return 128;
153 }
154
155 return Math.max(estimatedSize, 256);
156 }
157
158 private ByteBuf encodeFullFrame(StompFrame frame, ChannelHandlerContext ctx) {
159 int contentReadableBytes = frame.content().readableBytes();
160 ByteBuf buf = ctx.alloc().buffer(headersSubFrameSize(frame) + contentReadableBytes);
161 encodeHeaders(frame, buf);
162
163 if (contentReadableBytes > 0) {
164 buf.writeBytes(frame.content());
165 }
166
167 return buf.writeByte(NUL);
168 }
169
170 private static void encodeHeaders(StompHeadersSubframe frame, ByteBuf buf) {
171 StompCommand command = frame.command();
172 ByteBufUtil.writeUtf8(buf, command.toString());
173 buf.writeByte(StompConstants.LF);
174
175 boolean shouldEscape = shouldEscape(command);
176 LinkedHashMap<CharSequence, CharSequence> cache = ESCAPE_HEADER_KEY_CACHE.get();
177 for (Entry<CharSequence, CharSequence> entry : frame.headers()) {
178 CharSequence headerKey = entry.getKey();
179 if (shouldEscape) {
180 CharSequence cachedHeaderKey = cache.get(headerKey);
181 if (cachedHeaderKey == null) {
182 cachedHeaderKey = escape(headerKey);
183 cache.put(headerKey, cachedHeaderKey);
184 }
185 headerKey = cachedHeaderKey;
186 }
187
188 ByteBufUtil.writeUtf8(buf, headerKey);
189 buf.writeByte(StompConstants.COLON);
190
191 CharSequence headerValue = shouldEscape? escape(entry.getValue()) : entry.getValue();
192 ByteBufUtil.writeUtf8(buf, headerValue);
193 buf.writeByte(StompConstants.LF);
194 }
195
196 buf.writeByte(StompConstants.LF);
197 }
198
199 private static ByteBuf encodeContent(StompContentSubframe content, ChannelHandlerContext ctx) {
200 if (content instanceof LastStompContentSubframe) {
201 ByteBuf buf = ctx.alloc().buffer(content.content().readableBytes() + 1);
202 buf.writeBytes(content.content());
203 buf.writeByte(StompConstants.NUL);
204 return buf;
205 }
206
207 return content.content().retain();
208 }
209
210 private static boolean shouldEscape(StompCommand command) {
211 return command != StompCommand.CONNECT && command != StompCommand.CONNECTED;
212 }
213
214 private static CharSequence escape(CharSequence input) {
215 AppendableCharSequence builder = null;
216 for (int i = 0; i < input.length(); i++) {
217 char chr = input.charAt(i);
218 if (chr == '\\') {
219 builder = escapeBuilder(builder, input, i);
220 builder.append("\\\\");
221 } else if (chr == ':') {
222 builder = escapeBuilder(builder, input, i);
223 builder.append("\\c");
224 } else if (chr == '\n') {
225 builder = escapeBuilder(builder, input, i);
226 builder.append("\\n");
227 } else if (chr == '\r') {
228 builder = escapeBuilder(builder, input, i);
229 builder.append("\\r");
230 } else if (builder != null) {
231 builder.append(chr);
232 }
233 }
234
235 return builder != null? builder : input;
236 }
237
238 private static AppendableCharSequence escapeBuilder(AppendableCharSequence builder, CharSequence input,
239 int offset) {
240 if (builder != null) {
241 return builder;
242 }
243
244
245 return new AppendableCharSequence(input.length() + 8).append(input, 0, offset);
246 }
247 }