1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.stomp;
17
18 import io.netty.channel.ChannelHandlerContext;
19 import io.netty.channel.SimpleChannelInboundHandler;
20 import io.netty.handler.codec.stomp.DefaultStompFrame;
21 import io.netty.handler.codec.stomp.StompCommand;
22 import io.netty.handler.codec.stomp.StompFrame;
23 import io.netty.handler.codec.stomp.StompHeaders;
24
25
26
27
28 public class StompClientHandler extends SimpleChannelInboundHandler<StompFrame> {
29
30 private enum ClientState {
31 AUTHENTICATING,
32 AUTHENTICATED,
33 SUBSCRIBED,
34 DISCONNECTING
35 }
36
37 private ClientState state;
38
39 @Override
40 public void channelActive(ChannelHandlerContext ctx) throws Exception {
41 state = ClientState.AUTHENTICATING;
42 StompFrame connFrame = new DefaultStompFrame(StompCommand.CONNECT);
43 connFrame.headers().set(StompHeaders.ACCEPT_VERSION, "1.2");
44 connFrame.headers().set(StompHeaders.HOST, StompClient.HOST);
45 connFrame.headers().set(StompHeaders.LOGIN, StompClient.LOGIN);
46 connFrame.headers().set(StompHeaders.PASSCODE, StompClient.PASSCODE);
47 ctx.writeAndFlush(connFrame);
48 }
49
50 @Override
51 protected void channelRead0(ChannelHandlerContext ctx, StompFrame frame) throws Exception {
52 String subscrReceiptId = "001";
53 String disconReceiptId = "002";
54 switch (frame.command()) {
55 case CONNECTED:
56 StompFrame subscribeFrame = new DefaultStompFrame(StompCommand.SUBSCRIBE);
57 subscribeFrame.headers().set(StompHeaders.DESTINATION, StompClient.TOPIC);
58 subscribeFrame.headers().set(StompHeaders.RECEIPT, subscrReceiptId);
59 subscribeFrame.headers().set(StompHeaders.ID, "1");
60 System.out.println("connected, sending subscribe frame: " + subscribeFrame);
61 state = ClientState.AUTHENTICATED;
62 ctx.writeAndFlush(subscribeFrame);
63 break;
64 case RECEIPT:
65 String receiptHeader = frame.headers().getAsString(StompHeaders.RECEIPT_ID);
66 if (state == ClientState.AUTHENTICATED && receiptHeader.equals(subscrReceiptId)) {
67 StompFrame msgFrame = new DefaultStompFrame(StompCommand.SEND);
68 msgFrame.headers().set(StompHeaders.DESTINATION, StompClient.TOPIC);
69 msgFrame.content().writeBytes("some payload".getBytes());
70 System.out.println("subscribed, sending message frame: " + msgFrame);
71 state = ClientState.SUBSCRIBED;
72 ctx.writeAndFlush(msgFrame);
73 } else if (state == ClientState.DISCONNECTING && receiptHeader.equals(disconReceiptId)) {
74 System.out.println("disconnected");
75 ctx.close();
76 } else {
77 throw new IllegalStateException("received: " + frame + ", while internal state is " + state);
78 }
79 break;
80 case MESSAGE:
81 if (state == ClientState.SUBSCRIBED) {
82 System.out.println("received frame: " + frame);
83 StompFrame disconnFrame = new DefaultStompFrame(StompCommand.DISCONNECT);
84 disconnFrame.headers().set(StompHeaders.RECEIPT, disconReceiptId);
85 System.out.println("sending disconnect frame: " + disconnFrame);
86 state = ClientState.DISCONNECTING;
87 ctx.writeAndFlush(disconnFrame);
88 }
89 break;
90 default:
91 break;
92 }
93 }
94
95 @Override
96 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
97 cause.printStackTrace();
98 ctx.close();
99 }
100 }