1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package io.netty.handler.codec.http;
16
17 import io.netty.channel.ChannelHandlerContext;
18 import io.netty.channel.ChannelOutboundHandler;
19 import io.netty.channel.ChannelPromise;
20 import io.netty.util.AsciiString;
21 import io.netty.util.internal.ObjectUtil;
22
23 import java.net.SocketAddress;
24 import java.util.Collection;
25 import java.util.LinkedHashSet;
26 import java.util.List;
27 import java.util.Set;
28
29 import static io.netty.handler.codec.http.HttpResponseStatus.SWITCHING_PROTOCOLS;
30 import static io.netty.util.ReferenceCountUtil.release;
31
32
33
34
35
36
37
38
39 public class HttpClientUpgradeHandler extends HttpObjectAggregator implements ChannelOutboundHandler {
40
41
42
43
44 public enum UpgradeEvent {
45
46
47
48 UPGRADE_ISSUED,
49
50
51
52
53 UPGRADE_SUCCESSFUL,
54
55
56
57
58
59 UPGRADE_REJECTED
60 }
61
62
63
64
65 public interface SourceCodec {
66
67
68
69
70
71 void prepareUpgradeFrom(ChannelHandlerContext ctx);
72
73
74
75
76 void upgradeFrom(ChannelHandlerContext ctx);
77 }
78
79
80
81
82 public interface UpgradeCodec {
83
84
85
86 CharSequence protocol();
87
88
89
90
91
92 Collection<CharSequence> setUpgradeHeaders(ChannelHandlerContext ctx, HttpRequest upgradeRequest);
93
94
95
96
97
98
99
100
101
102 void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception;
103 }
104
105 private final SourceCodec sourceCodec;
106 private final UpgradeCodec upgradeCodec;
107 private boolean upgradeRequested;
108
109
110
111
112
113
114
115
116 public HttpClientUpgradeHandler(SourceCodec sourceCodec, UpgradeCodec upgradeCodec,
117 int maxContentLength) {
118 super(maxContentLength);
119 this.sourceCodec = ObjectUtil.checkNotNull(sourceCodec, "sourceCodec");
120 this.upgradeCodec = ObjectUtil.checkNotNull(upgradeCodec, "upgradeCodec");
121 }
122
123 @Override
124 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
125 ctx.bind(localAddress, promise);
126 }
127
128 @Override
129 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
130 ChannelPromise promise) throws Exception {
131 ctx.connect(remoteAddress, localAddress, promise);
132 }
133
134 @Override
135 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
136 ctx.disconnect(promise);
137 }
138
139 @Override
140 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
141 ctx.close(promise);
142 }
143
144 @Override
145 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
146 ctx.deregister(promise);
147 }
148
149 @Override
150 public void read(ChannelHandlerContext ctx) throws Exception {
151 ctx.read();
152 }
153
154 @Override
155 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
156 throws Exception {
157 if (!(msg instanceof HttpRequest)) {
158 ctx.write(msg, promise);
159 return;
160 }
161
162 if (upgradeRequested) {
163 promise.setFailure(new IllegalStateException(
164 "Attempting to write HTTP request with upgrade in progress"));
165 return;
166 }
167
168 upgradeRequested = true;
169 setUpgradeRequestHeaders(ctx, (HttpRequest) msg);
170
171
172 ctx.write(msg, promise);
173
174
175 ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_ISSUED);
176
177 }
178
179 @Override
180 public void flush(ChannelHandlerContext ctx) throws Exception {
181 ctx.flush();
182 }
183
184 @Override
185 protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
186 throws Exception {
187 FullHttpResponse response = null;
188 try {
189 if (!upgradeRequested) {
190 throw new IllegalStateException("Read HTTP response without requesting protocol switch");
191 }
192
193 if (msg instanceof HttpResponse) {
194 HttpResponse rep = (HttpResponse) msg;
195 if (!SWITCHING_PROTOCOLS.equals(rep.status())) {
196
197
198
199
200 ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_REJECTED);
201 removeThisHandler(ctx);
202 ctx.fireChannelRead(msg);
203 return;
204 }
205 }
206
207 if (msg instanceof FullHttpResponse) {
208 response = (FullHttpResponse) msg;
209
210 response.retain();
211 out.add(response);
212 } else {
213
214 super.decode(ctx, msg, out);
215 if (out.isEmpty()) {
216
217 return;
218 }
219
220 assert out.size() == 1;
221 response = (FullHttpResponse) out.get(0);
222 }
223
224 CharSequence upgradeHeader = response.headers().get(HttpHeaderNames.UPGRADE);
225 if (upgradeHeader != null && !AsciiString.contentEqualsIgnoreCase(upgradeCodec.protocol(), upgradeHeader)) {
226 throw new IllegalStateException(
227 "Switching Protocols response with unexpected UPGRADE protocol: " + upgradeHeader);
228 }
229
230
231 sourceCodec.prepareUpgradeFrom(ctx);
232 upgradeCodec.upgradeTo(ctx, response);
233
234
235 ctx.fireUserEventTriggered(UpgradeEvent.UPGRADE_SUCCESSFUL);
236
237
238
239 sourceCodec.upgradeFrom(ctx);
240
241
242
243 response.release();
244 out.clear();
245 removeThisHandler(ctx);
246 } catch (Throwable t) {
247 release(response);
248 ctx.fireExceptionCaught(t);
249 removeThisHandler(ctx);
250 }
251 }
252
253 private static void removeThisHandler(ChannelHandlerContext ctx) {
254 ctx.pipeline().remove(ctx.name());
255 }
256
257
258
259
260 private void setUpgradeRequestHeaders(ChannelHandlerContext ctx, HttpRequest request) {
261
262 request.headers().set(HttpHeaderNames.UPGRADE, upgradeCodec.protocol());
263
264
265 Set<CharSequence> connectionParts = new LinkedHashSet<CharSequence>(2);
266 connectionParts.addAll(upgradeCodec.setUpgradeHeaders(ctx, request));
267
268
269 StringBuilder builder = new StringBuilder();
270 for (CharSequence part : connectionParts) {
271 builder.append(part);
272 builder.append(',');
273 }
274 builder.append(HttpHeaderValues.UPGRADE);
275 request.headers().add(HttpHeaderNames.CONNECTION, builder.toString());
276 }
277 }