1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.handler.codec.DecoderException;
22 import io.netty.handler.codec.ReplayingDecoder;
23 import io.netty.handler.codec.TooLongFrameException;
24 import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState;
25 import io.netty.handler.codec.mqtt.MqttProperties.IntegerProperty;
26 import io.netty.util.CharsetUtil;
27 import io.netty.util.internal.ObjectUtil;
28
29 import java.util.ArrayList;
30 import java.util.List;
31
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId;
34 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName;
35 import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields;
36 import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader;
37 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_BYTES_IN_MESSAGE;
38 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
39 import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy;
40
41
42
43
44
45
46
47
48
49 public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
50
51
52
53
54
55
56 enum DecoderState {
57 READ_FIXED_HEADER,
58 READ_VARIABLE_HEADER,
59 READ_PAYLOAD,
60 BAD_MESSAGE,
61 }
62
63 private MqttFixedHeader mqttFixedHeader;
64 private Object variableHeader;
65 private int bytesRemainingInVariablePart;
66
67 private final int maxBytesInMessage;
68 private final int maxClientIdLength;
69
70 public MqttDecoder() {
71 this(DEFAULT_MAX_BYTES_IN_MESSAGE, DEFAULT_MAX_CLIENT_ID_LENGTH);
72 }
73
74 public MqttDecoder(int maxBytesInMessage) {
75 this(maxBytesInMessage, DEFAULT_MAX_CLIENT_ID_LENGTH);
76 }
77
78 public MqttDecoder(int maxBytesInMessage, int maxClientIdLength) {
79 super(DecoderState.READ_FIXED_HEADER);
80 this.maxBytesInMessage = ObjectUtil.checkPositive(maxBytesInMessage, "maxBytesInMessage");
81 this.maxClientIdLength = ObjectUtil.checkPositive(maxClientIdLength, "maxClientIdLength");
82 }
83
84 @Override
85 protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
86 switch (state()) {
87 case READ_FIXED_HEADER: try {
88 mqttFixedHeader = decodeFixedHeader(ctx, buffer);
89 bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
90 checkpoint(DecoderState.READ_VARIABLE_HEADER);
91
92 } catch (Exception cause) {
93 out.add(invalidMessage(cause));
94 return;
95 }
96
97 case READ_VARIABLE_HEADER: try {
98 final Result<?> decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
99 variableHeader = decodedVariableHeader.value;
100 if (bytesRemainingInVariablePart > maxBytesInMessage) {
101 buffer.skipBytes(actualReadableBytes());
102 throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes");
103 }
104 bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
105 checkpoint(DecoderState.READ_PAYLOAD);
106
107 } catch (Exception cause) {
108 out.add(invalidMessage(cause));
109 return;
110 }
111
112 case READ_PAYLOAD: try {
113 final Result<?> decodedPayload =
114 decodePayload(
115 ctx,
116 buffer,
117 mqttFixedHeader.messageType(),
118 bytesRemainingInVariablePart,
119 maxClientIdLength,
120 variableHeader);
121 bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
122 if (bytesRemainingInVariablePart != 0) {
123 throw new DecoderException(
124 "non-zero remaining payload bytes: " +
125 bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
126 }
127 checkpoint(DecoderState.READ_FIXED_HEADER);
128 MqttMessage message = MqttMessageFactory.newMessage(
129 mqttFixedHeader, variableHeader, decodedPayload.value);
130 mqttFixedHeader = null;
131 variableHeader = null;
132 out.add(message);
133 break;
134 } catch (Exception cause) {
135 out.add(invalidMessage(cause));
136 return;
137 }
138
139 case BAD_MESSAGE:
140
141 buffer.skipBytes(actualReadableBytes());
142 break;
143
144 default:
145
146 throw new Error();
147 }
148 }
149
150 private MqttMessage invalidMessage(Throwable cause) {
151 checkpoint(DecoderState.BAD_MESSAGE);
152 return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
153 }
154
155
156
157
158
159
160
161
162
163
164
165 private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) {
166 short b1 = buffer.readUnsignedByte();
167
168 MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4);
169 boolean dupFlag = (b1 & 0x08) == 0x08;
170 int qosLevel = (b1 & 0x06) >> 1;
171 boolean retain = (b1 & 0x01) != 0;
172
173 switch (messageType) {
174 case PUBLISH:
175 if (qosLevel == 3) {
176 throw new DecoderException("Illegal QOS Level in fixed header of PUBLISH message ("
177 + qosLevel + ')');
178 }
179 break;
180
181 case PUBREL:
182 case SUBSCRIBE:
183 case UNSUBSCRIBE:
184 if (dupFlag) {
185 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
186 + " message, must be 0, found 1");
187 }
188 if (qosLevel != 1) {
189 throw new DecoderException("Illegal QOS Level in fixed header of " + messageType
190 + " message, must be 1, found " + qosLevel);
191 }
192 if (retain) {
193 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
194 + " message, must be 0, found 1");
195 }
196 break;
197
198 case AUTH:
199 case CONNACK:
200 case CONNECT:
201 case DISCONNECT:
202 case PINGREQ:
203 case PINGRESP:
204 case PUBACK:
205 case PUBCOMP:
206 case PUBREC:
207 case SUBACK:
208 case UNSUBACK:
209 if (dupFlag) {
210 throw new DecoderException("Illegal BIT 3 in fixed header of " + messageType
211 + " message, must be 0, found 1");
212 }
213 if (qosLevel != 0) {
214 throw new DecoderException("Illegal BIT 2 or 1 in fixed header of " + messageType
215 + " message, must be 0, found " + qosLevel);
216 }
217 if (retain) {
218 throw new DecoderException("Illegal BIT 0 in fixed header of " + messageType
219 + " message, must be 0, found 1");
220 }
221 break;
222 default:
223 throw new DecoderException("Unknown message type, do not know how to validate fixed header");
224 }
225
226 int remainingLength = 0;
227 int multiplier = 1;
228 short digit;
229 int loops = 0;
230 do {
231 digit = buffer.readUnsignedByte();
232 remainingLength += (digit & 127) * multiplier;
233 multiplier *= 128;
234 loops++;
235 } while ((digit & 128) != 0 && loops < 4);
236
237
238 if (loops == 4 && (digit & 128) != 0) {
239 throw new DecoderException("remaining length exceeds 4 digits (" + messageType + ')');
240 }
241 MqttFixedHeader decodedFixedHeader =
242 new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength);
243 return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader));
244 }
245
246
247
248
249
250
251
252 private Result<?> decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
253 switch (mqttFixedHeader.messageType()) {
254 case CONNECT:
255 return decodeConnectionVariableHeader(ctx, buffer);
256
257 case CONNACK:
258 return decodeConnAckVariableHeader(ctx, buffer);
259
260 case UNSUBSCRIBE:
261 case SUBSCRIBE:
262 case SUBACK:
263 case UNSUBACK:
264 return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer);
265
266 case PUBACK:
267 case PUBREC:
268 case PUBCOMP:
269 case PUBREL:
270 return decodePubReplyMessage(buffer);
271
272 case PUBLISH:
273 return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader);
274
275 case DISCONNECT:
276 case AUTH:
277 return decodeReasonCodeAndPropertiesVariableHeader(buffer);
278
279 case PINGREQ:
280 case PINGRESP:
281
282 return new Result<Object>(null, 0);
283 default:
284
285 throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType());
286 }
287 }
288
289 private static Result<MqttConnectVariableHeader> decodeConnectionVariableHeader(
290 ChannelHandlerContext ctx,
291 ByteBuf buffer) {
292 final Result<String> protoString = decodeString(buffer);
293 int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
294
295 final byte protocolLevel = buffer.readByte();
296 numberOfBytesConsumed += 1;
297
298 MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel);
299 MqttCodecUtil.setMqttVersion(ctx, version);
300
301 final int b1 = buffer.readUnsignedByte();
302 numberOfBytesConsumed += 1;
303
304 final int keepAlive = decodeMsbLsb(buffer);
305 numberOfBytesConsumed += 2;
306
307 final boolean hasUserName = (b1 & 0x80) == 0x80;
308 final boolean hasPassword = (b1 & 0x40) == 0x40;
309 final boolean willRetain = (b1 & 0x20) == 0x20;
310 final int willQos = (b1 & 0x18) >> 3;
311 final boolean willFlag = (b1 & 0x04) == 0x04;
312 final boolean cleanSession = (b1 & 0x02) == 0x02;
313 if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) {
314 final boolean zeroReservedFlag = (b1 & 0x01) == 0x0;
315 if (!zeroReservedFlag) {
316
317
318
319 throw new DecoderException("non-zero reserved flag");
320 }
321 }
322
323 final MqttProperties properties;
324 if (version == MqttVersion.MQTT_5) {
325 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
326 properties = propertiesResult.value;
327 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
328 } else {
329 properties = MqttProperties.NO_PROPERTIES;
330 }
331
332 final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
333 version.protocolName(),
334 version.protocolLevel(),
335 hasUserName,
336 hasPassword,
337 willRetain,
338 willQos,
339 willFlag,
340 cleanSession,
341 keepAlive,
342 properties);
343 return new Result<MqttConnectVariableHeader>(mqttConnectVariableHeader, numberOfBytesConsumed);
344 }
345
346 private static Result<MqttConnAckVariableHeader> decodeConnAckVariableHeader(
347 ChannelHandlerContext ctx,
348 ByteBuf buffer) {
349 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
350 final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
351 byte returnCode = buffer.readByte();
352 int numberOfBytesConsumed = 2;
353
354 final MqttProperties properties;
355 if (mqttVersion == MqttVersion.MQTT_5) {
356 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
357 properties = propertiesResult.value;
358 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
359 } else {
360 properties = MqttProperties.NO_PROPERTIES;
361 }
362
363 final MqttConnAckVariableHeader mqttConnAckVariableHeader =
364 new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties);
365 return new Result<MqttConnAckVariableHeader>(mqttConnAckVariableHeader, numberOfBytesConsumed);
366 }
367
368 private static Result<MqttMessageIdAndPropertiesVariableHeader> decodeMessageIdAndPropertiesVariableHeader(
369 ChannelHandlerContext ctx,
370 ByteBuf buffer) {
371 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
372 final int packetId = decodeMessageId(buffer);
373
374 final MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader;
375 final int mqtt5Consumed;
376
377 if (mqttVersion == MqttVersion.MQTT_5) {
378 final Result<MqttProperties> properties = decodeProperties(buffer);
379 mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId, properties.value);
380 mqtt5Consumed = properties.numberOfBytesConsumed;
381 } else {
382 mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId,
383 MqttProperties.NO_PROPERTIES);
384 mqtt5Consumed = 0;
385 }
386
387 return new Result<MqttMessageIdAndPropertiesVariableHeader>(mqttVariableHeader,
388 2 + mqtt5Consumed);
389 }
390
391 private Result<MqttPubReplyMessageVariableHeader> decodePubReplyMessage(ByteBuf buffer) {
392 final int packetId = decodeMessageId(buffer);
393
394 final MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader;
395 final int consumed;
396 final int packetIdNumberOfBytesConsumed = 2;
397 if (bytesRemainingInVariablePart > 3) {
398 final byte reasonCode = buffer.readByte();
399 final Result<MqttProperties> properties = decodeProperties(buffer);
400 mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
401 reasonCode,
402 properties.value);
403 consumed = packetIdNumberOfBytesConsumed + 1 + properties.numberOfBytesConsumed;
404 } else if (bytesRemainingInVariablePart > 2) {
405 final byte reasonCode = buffer.readByte();
406 mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
407 reasonCode,
408 MqttProperties.NO_PROPERTIES);
409 consumed = packetIdNumberOfBytesConsumed + 1;
410 } else {
411 mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId,
412 (byte) 0,
413 MqttProperties.NO_PROPERTIES);
414 consumed = packetIdNumberOfBytesConsumed;
415 }
416
417 return new Result<MqttPubReplyMessageVariableHeader>(mqttPubAckVariableHeader, consumed);
418 }
419
420 private Result<MqttReasonCodeAndPropertiesVariableHeader> decodeReasonCodeAndPropertiesVariableHeader(
421 ByteBuf buffer) {
422 final byte reasonCode;
423 final MqttProperties properties;
424 final int consumed;
425 if (bytesRemainingInVariablePart > 1) {
426 reasonCode = buffer.readByte();
427 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
428 properties = propertiesResult.value;
429 consumed = 1 + propertiesResult.numberOfBytesConsumed;
430 } else if (bytesRemainingInVariablePart > 0) {
431 reasonCode = buffer.readByte();
432 properties = MqttProperties.NO_PROPERTIES;
433 consumed = 1;
434 } else {
435 reasonCode = 0;
436 properties = MqttProperties.NO_PROPERTIES;
437 consumed = 0;
438 }
439 final MqttReasonCodeAndPropertiesVariableHeader mqttReasonAndPropsVariableHeader =
440 new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties);
441
442 return new Result<MqttReasonCodeAndPropertiesVariableHeader>(
443 mqttReasonAndPropsVariableHeader,
444 consumed);
445 }
446
447 private Result<MqttPublishVariableHeader> decodePublishVariableHeader(
448 ChannelHandlerContext ctx,
449 ByteBuf buffer,
450 MqttFixedHeader mqttFixedHeader) {
451 final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx);
452 final Result<String> decodedTopic = decodeString(buffer);
453 if (!isValidPublishTopicName(decodedTopic.value)) {
454 throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)");
455 }
456 int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
457
458 int messageId = -1;
459 if (mqttFixedHeader.qosLevel().value() > 0) {
460 messageId = decodeMessageId(buffer);
461 numberOfBytesConsumed += 2;
462 }
463
464 final MqttProperties properties;
465 if (mqttVersion == MqttVersion.MQTT_5) {
466 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
467 properties = propertiesResult.value;
468 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
469 } else {
470 properties = MqttProperties.NO_PROPERTIES;
471 }
472
473 final MqttPublishVariableHeader mqttPublishVariableHeader =
474 new MqttPublishVariableHeader(decodedTopic.value, messageId, properties);
475 return new Result<MqttPublishVariableHeader>(mqttPublishVariableHeader, numberOfBytesConsumed);
476 }
477
478
479
480
481 private static int decodeMessageId(ByteBuf buffer) {
482 final int messageId = decodeMsbLsb(buffer);
483 if (!isValidMessageId(messageId)) {
484 throw new DecoderException("invalid messageId: " + messageId);
485 }
486 return messageId;
487 }
488
489
490
491
492
493
494
495
496
497
498 private static Result<?> decodePayload(
499 ChannelHandlerContext ctx,
500 ByteBuf buffer,
501 MqttMessageType messageType,
502 int bytesRemainingInVariablePart,
503 int maxClientIdLength,
504 Object variableHeader) {
505 switch (messageType) {
506 case CONNECT:
507 return decodeConnectionPayload(buffer, maxClientIdLength, (MqttConnectVariableHeader) variableHeader);
508
509 case SUBSCRIBE:
510 return decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
511
512 case SUBACK:
513 return decodeSubackPayload(buffer, bytesRemainingInVariablePart);
514
515 case UNSUBSCRIBE:
516 return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
517
518 case UNSUBACK:
519 return decodeUnsubAckPayload(ctx, buffer, bytesRemainingInVariablePart);
520
521 case PUBLISH:
522 return decodePublishPayload(buffer, bytesRemainingInVariablePart);
523
524 default:
525
526 return new Result<Object>(null, 0);
527 }
528 }
529
530 private static Result<MqttConnectPayload> decodeConnectionPayload(
531 ByteBuf buffer,
532 int maxClientIdLength,
533 MqttConnectVariableHeader mqttConnectVariableHeader) {
534 final Result<String> decodedClientId = decodeString(buffer);
535 final String decodedClientIdValue = decodedClientId.value;
536 final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
537 (byte) mqttConnectVariableHeader.version());
538 if (!isValidClientId(mqttVersion, maxClientIdLength, decodedClientIdValue)) {
539 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + decodedClientIdValue);
540 }
541 int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
542
543 Result<String> decodedWillTopic = null;
544 byte[] decodedWillMessage = null;
545
546 final MqttProperties willProperties;
547 if (mqttConnectVariableHeader.isWillFlag()) {
548 if (mqttVersion == MqttVersion.MQTT_5) {
549 final Result<MqttProperties> propertiesResult = decodeProperties(buffer);
550 willProperties = propertiesResult.value;
551 numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed;
552 } else {
553 willProperties = MqttProperties.NO_PROPERTIES;
554 }
555 decodedWillTopic = decodeString(buffer, 0, 32767);
556 numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
557 decodedWillMessage = decodeByteArray(buffer);
558 numberOfBytesConsumed += decodedWillMessage.length + 2;
559 } else {
560 willProperties = MqttProperties.NO_PROPERTIES;
561 }
562 Result<String> decodedUserName = null;
563 byte[] decodedPassword = null;
564 if (mqttConnectVariableHeader.hasUserName()) {
565 decodedUserName = decodeString(buffer);
566 numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
567 }
568 if (mqttConnectVariableHeader.hasPassword()) {
569 decodedPassword = decodeByteArray(buffer);
570 numberOfBytesConsumed += decodedPassword.length + 2;
571 }
572
573 final MqttConnectPayload mqttConnectPayload =
574 new MqttConnectPayload(
575 decodedClientId.value,
576 willProperties,
577 decodedWillTopic != null ? decodedWillTopic.value : null,
578 decodedWillMessage,
579 decodedUserName != null ? decodedUserName.value : null,
580 decodedPassword);
581 return new Result<MqttConnectPayload>(mqttConnectPayload, numberOfBytesConsumed);
582 }
583
584 private static Result<MqttSubscribePayload> decodeSubscribePayload(
585 ByteBuf buffer,
586 int bytesRemainingInVariablePart) {
587 final List<MqttTopicSubscription> subscribeTopics = new ArrayList<MqttTopicSubscription>();
588 int numberOfBytesConsumed = 0;
589 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
590 final Result<String> decodedTopicName = decodeString(buffer);
591 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
592
593 final short optionByte = buffer.readUnsignedByte();
594
595 MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03);
596 boolean noLocal = ((optionByte & 0x04) >> 2) == 1;
597 boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1;
598 RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4);
599
600 final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos,
601 noLocal,
602 retainAsPublished,
603 retainHandling);
604
605 numberOfBytesConsumed++;
606 subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption));
607 }
608 return new Result<MqttSubscribePayload>(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
609 }
610
611 private static Result<MqttSubAckPayload> decodeSubackPayload(
612 ByteBuf buffer,
613 int bytesRemainingInVariablePart) {
614 final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
615 int numberOfBytesConsumed = 0;
616 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
617 int reasonCode = buffer.readUnsignedByte();
618 numberOfBytesConsumed++;
619 grantedQos.add(reasonCode);
620 }
621 return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
622 }
623
624 private static Result<MqttUnsubAckPayload> decodeUnsubAckPayload(
625 ChannelHandlerContext ctx,
626 ByteBuf buffer,
627 int bytesRemainingInVariablePart) {
628 final List<Short> reasonCodes = new ArrayList<Short>(bytesRemainingInVariablePart);
629 int numberOfBytesConsumed = 0;
630 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
631 short reasonCode = buffer.readUnsignedByte();
632 numberOfBytesConsumed++;
633 reasonCodes.add(reasonCode);
634 }
635 return new Result<MqttUnsubAckPayload>(new MqttUnsubAckPayload(reasonCodes), numberOfBytesConsumed);
636 }
637
638 private static Result<MqttUnsubscribePayload> decodeUnsubscribePayload(
639 ByteBuf buffer,
640 int bytesRemainingInVariablePart) {
641 final List<String> unsubscribeTopics = new ArrayList<String>();
642 int numberOfBytesConsumed = 0;
643 while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
644 final Result<String> decodedTopicName = decodeString(buffer);
645 numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
646 unsubscribeTopics.add(decodedTopicName.value);
647 }
648 return new Result<MqttUnsubscribePayload>(
649 new MqttUnsubscribePayload(unsubscribeTopics),
650 numberOfBytesConsumed);
651 }
652
653 private static Result<ByteBuf> decodePublishPayload(ByteBuf buffer, int bytesRemainingInVariablePart) {
654 ByteBuf b = buffer.readRetainedSlice(bytesRemainingInVariablePart);
655 return new Result<ByteBuf>(b, bytesRemainingInVariablePart);
656 }
657
658 private static Result<String> decodeString(ByteBuf buffer) {
659 return decodeString(buffer, 0, Integer.MAX_VALUE);
660 }
661
662 private static Result<String> decodeString(ByteBuf buffer, int minBytes, int maxBytes) {
663 int size = decodeMsbLsb(buffer);
664 int numberOfBytesConsumed = 2;
665 if (size < minBytes || size > maxBytes) {
666 buffer.skipBytes(size);
667 numberOfBytesConsumed += size;
668 return new Result<String>(null, numberOfBytesConsumed);
669 }
670 String s = buffer.toString(buffer.readerIndex(), size, CharsetUtil.UTF_8);
671 buffer.skipBytes(size);
672 numberOfBytesConsumed += size;
673 return new Result<String>(s, numberOfBytesConsumed);
674 }
675
676
677
678
679
680 private static byte[] decodeByteArray(ByteBuf buffer) {
681 int size = decodeMsbLsb(buffer);
682 byte[] bytes = new byte[size];
683 buffer.readBytes(bytes);
684 return bytes;
685 }
686
687
688 private static long packInts(int a, int b) {
689 return (((long) a) << 32) | (b & 0xFFFFFFFFL);
690 }
691
692 private static int unpackA(long ints) {
693 return (int) (ints >> 32);
694 }
695
696 private static int unpackB(long ints) {
697 return (int) ints;
698 }
699
700
701
702
703 private static int decodeMsbLsb(ByteBuf buffer) {
704 int min = 0;
705 int max = 65535;
706 short msbSize = buffer.readUnsignedByte();
707 short lsbSize = buffer.readUnsignedByte();
708 int result = msbSize << 8 | lsbSize;
709 if (result < min || result > max) {
710 result = -1;
711 }
712 return result;
713 }
714
715
716
717
718
719
720
721
722 private static long decodeVariableByteInteger(ByteBuf buffer) {
723 int remainingLength = 0;
724 int multiplier = 1;
725 short digit;
726 int loops = 0;
727 do {
728 digit = buffer.readUnsignedByte();
729 remainingLength += (digit & 127) * multiplier;
730 multiplier *= 128;
731 loops++;
732 } while ((digit & 128) != 0 && loops < 4);
733
734 if (loops == 4 && (digit & 128) != 0) {
735 throw new DecoderException("MQTT protocol limits Remaining Length to 4 bytes");
736 }
737 return packInts(remainingLength, loops);
738 }
739
740 private static final class Result<T> {
741
742 private final T value;
743 private final int numberOfBytesConsumed;
744
745 Result(T value, int numberOfBytesConsumed) {
746 this.value = value;
747 this.numberOfBytesConsumed = numberOfBytesConsumed;
748 }
749 }
750
751 private static Result<MqttProperties> decodeProperties(ByteBuf buffer) {
752 final long propertiesLength = decodeVariableByteInteger(buffer);
753 int totalPropertiesLength = unpackA(propertiesLength);
754 int numberOfBytesConsumed = unpackB(propertiesLength);
755
756 MqttProperties decodedProperties = new MqttProperties();
757 while (numberOfBytesConsumed < totalPropertiesLength) {
758 long propertyId = decodeVariableByteInteger(buffer);
759 final int propertyIdValue = unpackA(propertyId);
760 numberOfBytesConsumed += unpackB(propertyId);
761 MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyIdValue);
762 switch (propertyType) {
763 case PAYLOAD_FORMAT_INDICATOR:
764 case REQUEST_PROBLEM_INFORMATION:
765 case REQUEST_RESPONSE_INFORMATION:
766 case MAXIMUM_QOS:
767 case RETAIN_AVAILABLE:
768 case WILDCARD_SUBSCRIPTION_AVAILABLE:
769 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
770 case SHARED_SUBSCRIPTION_AVAILABLE:
771 final int b1 = buffer.readUnsignedByte();
772 numberOfBytesConsumed++;
773 decodedProperties.add(new IntegerProperty(propertyIdValue, b1));
774 break;
775 case SERVER_KEEP_ALIVE:
776 case RECEIVE_MAXIMUM:
777 case TOPIC_ALIAS_MAXIMUM:
778 case TOPIC_ALIAS:
779 final int int2BytesResult = decodeMsbLsb(buffer);
780 numberOfBytesConsumed += 2;
781 decodedProperties.add(new IntegerProperty(propertyIdValue, int2BytesResult));
782 break;
783 case PUBLICATION_EXPIRY_INTERVAL:
784 case SESSION_EXPIRY_INTERVAL:
785 case WILL_DELAY_INTERVAL:
786 case MAXIMUM_PACKET_SIZE:
787 final int maxPacketSize = buffer.readInt();
788 numberOfBytesConsumed += 4;
789 decodedProperties.add(new IntegerProperty(propertyIdValue, maxPacketSize));
790 break;
791 case SUBSCRIPTION_IDENTIFIER:
792 long vbIntegerResult = decodeVariableByteInteger(buffer);
793 numberOfBytesConsumed += unpackB(vbIntegerResult);
794 decodedProperties.add(new IntegerProperty(propertyIdValue, unpackA(vbIntegerResult)));
795 break;
796 case CONTENT_TYPE:
797 case RESPONSE_TOPIC:
798 case ASSIGNED_CLIENT_IDENTIFIER:
799 case AUTHENTICATION_METHOD:
800 case RESPONSE_INFORMATION:
801 case SERVER_REFERENCE:
802 case REASON_STRING:
803 final Result<String> stringResult = decodeString(buffer);
804 numberOfBytesConsumed += stringResult.numberOfBytesConsumed;
805 decodedProperties.add(new MqttProperties.StringProperty(propertyIdValue, stringResult.value));
806 break;
807 case USER_PROPERTY:
808 final Result<String> keyResult = decodeString(buffer);
809 final Result<String> valueResult = decodeString(buffer);
810 numberOfBytesConsumed += keyResult.numberOfBytesConsumed;
811 numberOfBytesConsumed += valueResult.numberOfBytesConsumed;
812 decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value));
813 break;
814 case CORRELATION_DATA:
815 case AUTHENTICATION_DATA:
816 final byte[] binaryDataResult = decodeByteArray(buffer);
817 numberOfBytesConsumed += binaryDataResult.length + 2;
818 decodedProperties.add(new MqttProperties.BinaryProperty(propertyIdValue, binaryDataResult));
819 break;
820 default:
821
822 throw new DecoderException("Unknown property type: " + propertyType);
823 }
824 }
825
826 return new Result<MqttProperties>(decodedProperties, numberOfBytesConsumed);
827 }
828 }