1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.example.mqtt.heartBeat;
17
18 import io.netty.channel.ChannelHandler.Sharable;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.ChannelInboundHandlerAdapter;
21 import io.netty.handler.codec.mqtt.MqttConnAckMessage;
22 import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
23 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
24 import io.netty.handler.codec.mqtt.MqttFixedHeader;
25 import io.netty.handler.codec.mqtt.MqttMessage;
26 import io.netty.handler.codec.mqtt.MqttMessageType;
27 import io.netty.handler.codec.mqtt.MqttQoS;
28 import io.netty.handler.timeout.IdleState;
29 import io.netty.handler.timeout.IdleStateEvent;
30 import io.netty.util.ReferenceCountUtil;
31
32 @Sharable
33 public final class MqttHeartBeatBrokerHandler extends ChannelInboundHandlerAdapter {
34
35 public static final MqttHeartBeatBrokerHandler INSTANCE = new MqttHeartBeatBrokerHandler();
36
37 private MqttHeartBeatBrokerHandler() {
38 }
39
40 @Override
41 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
42 MqttMessage mqttMessage = (MqttMessage) msg;
43 System.out.println("Received MQTT message: " + mqttMessage);
44 switch (mqttMessage.fixedHeader().messageType()) {
45 case CONNECT:
46 MqttFixedHeader connackFixedHeader =
47 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
48 MqttConnAckVariableHeader mqttConnAckVariableHeader =
49 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
50 MqttConnAckMessage connack = new MqttConnAckMessage(connackFixedHeader, mqttConnAckVariableHeader);
51 ctx.writeAndFlush(connack);
52 break;
53 case PINGREQ:
54 MqttFixedHeader pingreqFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false,
55 MqttQoS.AT_MOST_ONCE, false, 0);
56 MqttMessage pingResp = new MqttMessage(pingreqFixedHeader);
57 ctx.writeAndFlush(pingResp);
58 break;
59 case DISCONNECT:
60 ctx.close();
61 break;
62 default:
63 System.out.println("Unexpected message type: " + mqttMessage.fixedHeader().messageType());
64 ReferenceCountUtil.release(msg);
65 ctx.close();
66 }
67 }
68
69 @Override
70 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
71 System.out.println("Channel heartBeat lost");
72 if (evt instanceof IdleStateEvent && IdleState.READER_IDLE == ((IdleStateEvent) evt).state()) {
73 ctx.close();
74 }
75 }
76
77 @Override
78 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
79 cause.printStackTrace();
80 ctx.close();
81 }
82 }