1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.DecoderException;
21 import io.netty.util.Attribute;
22 import io.netty.util.AttributeKey;
23
24 import static io.netty.handler.codec.mqtt.MqttConstant.MIN_CLIENT_ID_LENGTH;
25
26 final class MqttCodecUtil {
27
28 private static final char[] TOPIC_WILDCARDS = {'#', '+'};
29
30 static final AttributeKey<MqttVersion> MQTT_VERSION_KEY = AttributeKey.valueOf("NETTY_CODEC_MQTT_VERSION");
31
32 static MqttVersion getMqttVersion(ChannelHandlerContext ctx) {
33 Attribute<MqttVersion> attr = ctx.channel().attr(MQTT_VERSION_KEY);
34 MqttVersion version = attr.get();
35 if (version == null) {
36 return MqttVersion.MQTT_3_1_1;
37 }
38 return version;
39 }
40
41 static void setMqttVersion(ChannelHandlerContext ctx, MqttVersion version) {
42 Attribute<MqttVersion> attr = ctx.channel().attr(MQTT_VERSION_KEY);
43 attr.set(version);
44 }
45
46 static boolean isValidPublishTopicName(String topicName) {
47
48 for (char c : TOPIC_WILDCARDS) {
49 if (topicName.indexOf(c) >= 0) {
50 return false;
51 }
52 }
53 return true;
54 }
55
56 static boolean isValidMessageId(int messageId) {
57 return messageId != 0;
58 }
59
60 static boolean isValidClientId(MqttVersion mqttVersion, int maxClientIdLength, String clientId) {
61 if (mqttVersion == MqttVersion.MQTT_3_1) {
62 return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH &&
63 clientId.length() <= maxClientIdLength;
64 }
65 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_5) {
66
67
68 return clientId != null;
69 }
70 throw new IllegalArgumentException(mqttVersion + " is unknown mqtt version");
71 }
72
73 static MqttFixedHeader validateFixedHeader(ChannelHandlerContext ctx, MqttFixedHeader mqttFixedHeader) {
74 switch (mqttFixedHeader.messageType()) {
75 case PUBREL:
76 case SUBSCRIBE:
77 case UNSUBSCRIBE:
78 if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) {
79 throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1");
80 }
81 return mqttFixedHeader;
82 case AUTH:
83 if (MqttCodecUtil.getMqttVersion(ctx) != MqttVersion.MQTT_5) {
84 throw new DecoderException("AUTH message requires at least MQTT 5");
85 }
86 return mqttFixedHeader;
87 default:
88 return mqttFixedHeader;
89 }
90 }
91
92 static MqttFixedHeader resetUnusedFields(MqttFixedHeader mqttFixedHeader) {
93 switch (mqttFixedHeader.messageType()) {
94 case CONNECT:
95 case CONNACK:
96 case PUBACK:
97 case PUBREC:
98 case PUBCOMP:
99 case SUBACK:
100 case UNSUBACK:
101 case PINGREQ:
102 case PINGRESP:
103 case DISCONNECT:
104 if (mqttFixedHeader.isDup() ||
105 mqttFixedHeader.qosLevel() != MqttQoS.AT_MOST_ONCE ||
106 mqttFixedHeader.isRetain()) {
107 return new MqttFixedHeader(
108 mqttFixedHeader.messageType(),
109 false,
110 MqttQoS.AT_MOST_ONCE,
111 false,
112 mqttFixedHeader.remainingLength());
113 }
114 return mqttFixedHeader;
115 case PUBREL:
116 case SUBSCRIBE:
117 case UNSUBSCRIBE:
118 if (mqttFixedHeader.isRetain()) {
119 return new MqttFixedHeader(
120 mqttFixedHeader.messageType(),
121 mqttFixedHeader.isDup(),
122 mqttFixedHeader.qosLevel(),
123 false,
124 mqttFixedHeader.remainingLength());
125 }
126 return mqttFixedHeader;
127 default:
128 return mqttFixedHeader;
129 }
130 }
131
132 private MqttCodecUtil() { }
133 }