1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.stomp.websocket;
17
18 import io.netty.channel.ChannelHandler.Sharable;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.MessageToMessageCodec;
21 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
22 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
23 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
24 import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
25 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
26 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler.HandshakeComplete;
27 import io.netty.handler.codec.stomp.StompSubframe;
28 import io.netty.handler.codec.stomp.StompSubframeAggregator;
29 import io.netty.handler.codec.stomp.StompSubframeDecoder;
30
31 import java.util.List;
32
33 @Sharable
34 public class StompWebSocketProtocolCodec extends MessageToMessageCodec<WebSocketFrame, StompSubframe> {
35
36 private final StompChatHandler stompChatHandler = new StompChatHandler();
37 private final StompWebSocketFrameEncoder stompWebSocketFrameEncoder = new StompWebSocketFrameEncoder();
38
39 @Override
40 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
41 if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
42 StompVersion stompVersion = StompVersion.findBySubProtocol(((HandshakeComplete) evt).selectedSubprotocol());
43 ctx.channel().attr(StompVersion.CHANNEL_ATTRIBUTE_KEY).set(stompVersion);
44 ctx.pipeline()
45 .addLast(new WebSocketFrameAggregator(65536))
46 .addLast(new StompSubframeDecoder())
47 .addLast(new StompSubframeAggregator(65536))
48 .addLast(stompChatHandler)
49 .remove(StompWebSocketClientPageHandler.INSTANCE);
50 } else {
51 super.userEventTriggered(ctx, evt);
52 }
53 }
54
55 @Override
56 protected void encode(ChannelHandlerContext ctx, StompSubframe stompFrame, List<Object> out) throws Exception {
57 stompWebSocketFrameEncoder.encode(ctx, stompFrame, out);
58 }
59
60 @Override
61 protected void decode(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame, List<Object> out) {
62 if (webSocketFrame instanceof TextWebSocketFrame || webSocketFrame instanceof BinaryWebSocketFrame) {
63 out.add(webSocketFrame.content().retain());
64 } else {
65 ctx.close();
66 }
67 }
68 }