1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http;
16
17 import io.netty.buffer.Unpooled;
18 import io.netty.channel.ChannelFuture;
19 import io.netty.channel.ChannelFutureListener;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.ReferenceCounted;
23
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.List;
27
28 import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
29 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
30 import static io.netty.util.AsciiString.containsAllContentEqualsIgnoreCase;
31 import static io.netty.util.AsciiString.containsContentEqualsIgnoreCase;
32 import static io.netty.util.internal.ObjectUtil.checkNotNull;
33 import static io.netty.util.internal.StringUtil.COMMA;
34
35
36
37
38
39
40 public class HttpServerUpgradeHandler extends HttpObjectAggregator {
41
42
43
44
45 public interface SourceCodec {
46
47
48
49 void upgradeFrom(ChannelHandlerContext ctx);
50 }
51
52
53
54
55 public interface UpgradeCodec {
56
57
58
59
60 Collection<CharSequence> requiredUpgradeHeaders();
61
62
63
64
65
66
67
68
69
70 boolean prepareUpgradeResponse(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest,
71 HttpHeaders upgradeHeaders);
72
73
74
75
76
77
78
79
80 void upgradeTo(ChannelHandlerContext ctx, FullHttpRequest upgradeRequest);
81 }
82
83
84
85
86 public interface UpgradeCodecFactory {
87
88
89
90
91
92
93
94 UpgradeCodec newUpgradeCodec(CharSequence protocol);
95 }
96
97
98
99
100
101
102 public static final class UpgradeEvent implements ReferenceCounted {
103 private final CharSequence protocol;
104 private final FullHttpRequest upgradeRequest;
105
106 UpgradeEvent(CharSequence protocol, FullHttpRequest upgradeRequest) {
107 this.protocol = protocol;
108 this.upgradeRequest = upgradeRequest;
109 }
110
111
112
113
114 public CharSequence protocol() {
115 return protocol;
116 }
117
118
119
120
121 public FullHttpRequest upgradeRequest() {
122 return upgradeRequest;
123 }
124
125 @Override
126 public int refCnt() {
127 return upgradeRequest.refCnt();
128 }
129
130 @Override
131 public UpgradeEvent retain() {
132 upgradeRequest.retain();
133 return this;
134 }
135
136 @Override
137 public UpgradeEvent retain(int increment) {
138 upgradeRequest.retain(increment);
139 return this;
140 }
141
142 @Override
143 public UpgradeEvent touch() {
144 upgradeRequest.touch();
145 return this;
146 }
147
148 @Override
149 public UpgradeEvent touch(Object hint) {
150 upgradeRequest.touch(hint);
151 return this;
152 }
153
154 @Override
155 public boolean release() {
156 return upgradeRequest.release();
157 }
158
159 @Override
160 public boolean release(int decrement) {
161 return upgradeRequest.release(decrement);
162 }
163
164 @Override
165 public String toString() {
166 return "UpgradeEvent [protocol=" + protocol + ", upgradeRequest=" + upgradeRequest + ']';
167 }
168 }
169
170 private final SourceCodec sourceCodec;
171 private final UpgradeCodecFactory upgradeCodecFactory;
172 private final HttpHeadersFactory headersFactory;
173 private final HttpHeadersFactory trailersFactory;
174 private boolean handlingUpgrade;
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory) {
191 this(sourceCodec, upgradeCodecFactory, 0,
192 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
193 }
194
195
196
197
198
199
200
201
202
203 public HttpServerUpgradeHandler(
204 SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength) {
205 this(sourceCodec, upgradeCodecFactory, maxContentLength,
206 DefaultHttpHeadersFactory.headersFactory(), DefaultHttpHeadersFactory.trailersFactory());
207 }
208
209
210
211
212
213
214
215
216
217
218 public HttpServerUpgradeHandler(SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory,
219 int maxContentLength, boolean validateHeaders) {
220 this(sourceCodec, upgradeCodecFactory, maxContentLength,
221 DefaultHttpHeadersFactory.headersFactory().withValidation(validateHeaders),
222 DefaultHttpHeadersFactory.trailersFactory().withValidation(validateHeaders));
223 }
224
225
226
227
228
229
230
231
232
233
234
235
236
237 public HttpServerUpgradeHandler(
238 SourceCodec sourceCodec, UpgradeCodecFactory upgradeCodecFactory, int maxContentLength,
239 HttpHeadersFactory headersFactory, HttpHeadersFactory trailersFactory) {
240 super(maxContentLength);
241
242 this.sourceCodec = checkNotNull(sourceCodec, "sourceCodec");
243 this.upgradeCodecFactory = checkNotNull(upgradeCodecFactory, "upgradeCodecFactory");
244 this.headersFactory = checkNotNull(headersFactory, "headersFactory");
245 this.trailersFactory = checkNotNull(trailersFactory, "trailersFactory");
246 }
247
248 @Override
249 protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
250 throws Exception {
251
252 if (!handlingUpgrade) {
253
254 if (msg instanceof HttpRequest) {
255 HttpRequest req = (HttpRequest) msg;
256 if (req.headers().contains(HttpHeaderNames.UPGRADE) &&
257 shouldHandleUpgradeRequest(req)) {
258 handlingUpgrade = true;
259 } else {
260 ReferenceCountUtil.retain(msg);
261 ctx.fireChannelRead(msg);
262 return;
263 }
264 } else {
265 ReferenceCountUtil.retain(msg);
266 ctx.fireChannelRead(msg);
267 return;
268 }
269 }
270
271 FullHttpRequest fullRequest;
272 if (msg instanceof FullHttpRequest) {
273 fullRequest = (FullHttpRequest) msg;
274 ReferenceCountUtil.retain(msg);
275 out.add(msg);
276 } else {
277
278 super.decode(ctx, msg, out);
279 if (out.isEmpty()) {
280
281 return;
282 }
283
284
285 assert out.size() == 1;
286 handlingUpgrade = false;
287 fullRequest = (FullHttpRequest) out.get(0);
288 }
289
290 if (upgrade(ctx, fullRequest)) {
291
292
293
294 out.clear();
295 }
296
297
298
299 }
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314 protected boolean shouldHandleUpgradeRequest(HttpRequest req) {
315 return true;
316 }
317
318
319
320
321
322
323
324
325
326 private boolean upgrade(final ChannelHandlerContext ctx, final FullHttpRequest request) {
327
328 final List<CharSequence> requestedProtocols = splitHeader(request.headers().get(HttpHeaderNames.UPGRADE));
329 final int numRequestedProtocols = requestedProtocols.size();
330 UpgradeCodec upgradeCodec = null;
331 CharSequence upgradeProtocol = null;
332 for (int i = 0; i < numRequestedProtocols; i ++) {
333 final CharSequence p = requestedProtocols.get(i);
334 final UpgradeCodec c = upgradeCodecFactory.newUpgradeCodec(p);
335 if (c != null) {
336 upgradeProtocol = p;
337 upgradeCodec = c;
338 break;
339 }
340 }
341
342 if (upgradeCodec == null) {
343
344 return false;
345 }
346
347
348 List<String> connectionHeaderValues = request.headers().getAll(HttpHeaderNames.CONNECTION);
349
350 if (connectionHeaderValues == null || connectionHeaderValues.isEmpty()) {
351 return false;
352 }
353
354 final StringBuilder concatenatedConnectionValue = new StringBuilder(connectionHeaderValues.size() * 10);
355 for (CharSequence connectionHeaderValue : connectionHeaderValues) {
356 concatenatedConnectionValue.append(connectionHeaderValue).append(COMMA);
357 }
358 concatenatedConnectionValue.setLength(concatenatedConnectionValue.length() - 1);
359
360
361 Collection<CharSequence> requiredHeaders = upgradeCodec.requiredUpgradeHeaders();
362 List<CharSequence> values = splitHeader(concatenatedConnectionValue);
363 if (!containsContentEqualsIgnoreCase(values, HttpHeaderNames.UPGRADE) ||
364 !containsAllContentEqualsIgnoreCase(values, requiredHeaders)) {
365 return false;
366 }
367
368
369 for (CharSequence requiredHeader : requiredHeaders) {
370 if (!request.headers().contains(requiredHeader)) {
371 return false;
372 }
373 }
374
375
376
377 final FullHttpResponse upgradeResponse = createUpgradeResponse(upgradeProtocol);
378 if (!upgradeCodec.prepareUpgradeResponse(ctx, request, upgradeResponse.headers())) {
379 return false;
380 }
381
382
383 final UpgradeEvent event = new UpgradeEvent(upgradeProtocol, request);
384
385
386
387
388
389 try {
390 final ChannelFuture writeComplete = ctx.writeAndFlush(upgradeResponse);
391
392 sourceCodec.upgradeFrom(ctx);
393 upgradeCodec.upgradeTo(ctx, request);
394
395
396 ctx.pipeline().remove(HttpServerUpgradeHandler.this);
397
398
399
400 ctx.fireUserEventTriggered(event.retain());
401
402
403
404
405 writeComplete.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
406 } finally {
407
408 event.release();
409 }
410 return true;
411 }
412
413
414
415
416 private FullHttpResponse createUpgradeResponse(CharSequence upgradeProtocol) {
417 DefaultFullHttpResponse res = new DefaultFullHttpResponse(
418 HTTP_1_1, SWITCHING_PROTOCOLS, Unpooled.EMPTY_BUFFER, headersFactory, trailersFactory);
419 res.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.UPGRADE);
420 res.headers().add(HttpHeaderNames.UPGRADE, upgradeProtocol);
421 return res;
422 }
423
424
425
426
427
428 private static List<CharSequence> splitHeader(CharSequence header) {
429 final StringBuilder builder = new StringBuilder(header.length());
430 final List<CharSequence> protocols = new ArrayList<CharSequence>(4);
431 for (int i = 0; i < header.length(); ++i) {
432 char c = header.charAt(i);
433 if (Character.isWhitespace(c)) {
434
435 continue;
436 }
437 if (c == ',') {
438
439 protocols.add(builder.toString());
440 builder.setLength(0);
441 } else {
442 builder.append(c);
443 }
444 }
445
446
447 if (builder.length() > 0) {
448 protocols.add(builder.toString());
449 }
450
451 return protocols;
452 }
453 }