1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty.handler.codec.mqtt;
18
19 import io.netty.buffer.ByteBuf;
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.handler.codec.EncoderException;
25 import io.netty.handler.codec.MessageToMessageEncoder;
26 import io.netty.util.internal.EmptyArrays;
27
28 import java.util.List;
29
30 import static io.netty.buffer.ByteBufUtil.*;
31 import static io.netty.handler.codec.mqtt.MqttCodecUtil.getMqttVersion;
32 import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidClientId;
33 import static io.netty.handler.codec.mqtt.MqttCodecUtil.setMqttVersion;
34 import static io.netty.handler.codec.mqtt.MqttConstant.DEFAULT_MAX_CLIENT_ID_LENGTH;
35
36
37
38
39
40
41
42 @ChannelHandler.Sharable
43 public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
44
45 public static final MqttEncoder INSTANCE = new MqttEncoder();
46
47 private MqttEncoder() { }
48
49 @Override
50 protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
51 out.add(doEncode(ctx, msg));
52 }
53
54
55
56
57
58
59
60
61 static ByteBuf doEncode(ChannelHandlerContext ctx,
62 MqttMessage message) {
63
64 switch (message.fixedHeader().messageType()) {
65 case CONNECT:
66 return encodeConnectMessage(ctx, (MqttConnectMessage) message);
67
68 case CONNACK:
69 return encodeConnAckMessage(ctx, (MqttConnAckMessage) message);
70
71 case PUBLISH:
72 return encodePublishMessage(ctx, (MqttPublishMessage) message);
73
74 case SUBSCRIBE:
75 return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message);
76
77 case UNSUBSCRIBE:
78 return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
79
80 case SUBACK:
81 return encodeSubAckMessage(ctx, (MqttSubAckMessage) message);
82
83 case UNSUBACK:
84 if (message instanceof MqttUnsubAckMessage) {
85 return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message);
86 }
87 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
88
89 case PUBACK:
90 case PUBREC:
91 case PUBREL:
92 case PUBCOMP:
93 return encodePubReplyMessage(ctx, message);
94
95 case DISCONNECT:
96 case AUTH:
97 return encodeReasonCodePlusPropertiesMessage(ctx, message);
98
99 case PINGREQ:
100 case PINGRESP:
101 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
102
103 default:
104 throw new IllegalArgumentException(
105 "Unknown message type: " + message.fixedHeader().messageType().value());
106 }
107 }
108
109 private static ByteBuf encodeConnectMessage(
110 ChannelHandlerContext ctx,
111 MqttConnectMessage message) {
112 int payloadBufferSize = 0;
113
114 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
115 MqttConnectVariableHeader variableHeader = message.variableHeader();
116 MqttConnectPayload payload = message.payload();
117 MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
118 (byte) variableHeader.version());
119 setMqttVersion(ctx, mqttVersion);
120
121
122 if (!variableHeader.hasUserName() && variableHeader.hasPassword()) {
123 throw new EncoderException("Without a username, the password MUST be not set");
124 }
125
126
127 String clientIdentifier = payload.clientIdentifier();
128 if (!isValidClientId(mqttVersion, DEFAULT_MAX_CLIENT_ID_LENGTH, clientIdentifier)) {
129 throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
130 }
131 int clientIdentifierBytes = utf8Bytes(clientIdentifier);
132 payloadBufferSize += 2 + clientIdentifierBytes;
133
134
135 String willTopic = payload.willTopic();
136 int willTopicBytes = nullableUtf8Bytes(willTopic);
137 byte[] willMessage = payload.willMessageInBytes();
138 byte[] willMessageBytes = willMessage != null ? willMessage : EmptyArrays.EMPTY_BYTES;
139 if (variableHeader.isWillFlag()) {
140 payloadBufferSize += 2 + willTopicBytes;
141 payloadBufferSize += 2 + willMessageBytes.length;
142 }
143
144 String userName = payload.userName();
145 int userNameBytes = nullableUtf8Bytes(userName);
146 if (variableHeader.hasUserName()) {
147 payloadBufferSize += 2 + userNameBytes;
148 }
149
150 byte[] password = payload.passwordInBytes();
151 byte[] passwordBytes = password != null ? password : EmptyArrays.EMPTY_BYTES;
152 if (variableHeader.hasPassword()) {
153 payloadBufferSize += 2 + passwordBytes.length;
154 }
155
156
157 byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
158 ByteBuf propertiesBuf = encodePropertiesIfNeeded(
159 mqttVersion,
160 ctx.alloc(),
161 message.variableHeader().properties());
162 try {
163 final ByteBuf willPropertiesBuf;
164 if (variableHeader.isWillFlag()) {
165 willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties());
166 payloadBufferSize += willPropertiesBuf.readableBytes();
167 } else {
168 willPropertiesBuf = Unpooled.EMPTY_BUFFER;
169 }
170 try {
171 int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes();
172
173 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
174 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
175 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
176 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
177 writeVariableLengthInt(buf, variablePartSize);
178
179 buf.writeShort(protocolNameBytes.length);
180 buf.writeBytes(protocolNameBytes);
181
182 buf.writeByte(variableHeader.version());
183 buf.writeByte(getConnVariableHeaderFlag(variableHeader));
184 buf.writeShort(variableHeader.keepAliveTimeSeconds());
185 buf.writeBytes(propertiesBuf);
186
187
188 writeExactUTF8String(buf, clientIdentifier, clientIdentifierBytes);
189 if (variableHeader.isWillFlag()) {
190 buf.writeBytes(willPropertiesBuf);
191 writeExactUTF8String(buf, willTopic, willTopicBytes);
192 buf.writeShort(willMessageBytes.length);
193 buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
194 }
195 if (variableHeader.hasUserName()) {
196 writeExactUTF8String(buf, userName, userNameBytes);
197 }
198 if (variableHeader.hasPassword()) {
199 buf.writeShort(passwordBytes.length);
200 buf.writeBytes(passwordBytes, 0, passwordBytes.length);
201 }
202 return buf;
203 } finally {
204 willPropertiesBuf.release();
205 }
206 } finally {
207 propertiesBuf.release();
208 }
209 }
210
211 private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
212 int flagByte = 0;
213 if (variableHeader.hasUserName()) {
214 flagByte |= 0x80;
215 }
216 if (variableHeader.hasPassword()) {
217 flagByte |= 0x40;
218 }
219 if (variableHeader.isWillRetain()) {
220 flagByte |= 0x20;
221 }
222 flagByte |= (variableHeader.willQos() & 0x03) << 3;
223 if (variableHeader.isWillFlag()) {
224 flagByte |= 0x04;
225 }
226 if (variableHeader.isCleanSession()) {
227 flagByte |= 0x02;
228 }
229 return flagByte;
230 }
231
232 private static ByteBuf encodeConnAckMessage(
233 ChannelHandlerContext ctx,
234 MqttConnAckMessage message) {
235 final MqttVersion mqttVersion = getMqttVersion(ctx);
236 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
237 ctx.alloc(),
238 message.variableHeader().properties());
239
240 try {
241 ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes());
242 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
243 writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes());
244 buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
245 buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
246 buf.writeBytes(propertiesBuf);
247 return buf;
248 } finally {
249 propertiesBuf.release();
250 }
251 }
252
253 private static ByteBuf encodeSubscribeMessage(
254 ChannelHandlerContext ctx,
255 MqttSubscribeMessage message) {
256 MqttVersion mqttVersion = getMqttVersion(ctx);
257 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
258 ctx.alloc(),
259 message.idAndPropertiesVariableHeader().properties());
260
261 try {
262 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
263 int payloadBufferSize = 0;
264
265 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
266 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
267 MqttSubscribePayload payload = message.payload();
268
269 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
270 String topicName = topic.topicName();
271 int topicNameBytes = utf8Bytes(topicName);
272 payloadBufferSize += 2 + topicNameBytes;
273 payloadBufferSize += 1;
274 }
275
276 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
277 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
278
279 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
280 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
281 writeVariableLengthInt(buf, variablePartSize);
282
283
284 int messageId = variableHeader.messageId();
285 buf.writeShort(messageId);
286 buf.writeBytes(propertiesBuf);
287
288
289 for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
290 writeUnsafeUTF8String(buf, topic.topicName());
291 if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) {
292 buf.writeByte(topic.qualityOfService().value());
293 } else {
294 final MqttSubscriptionOption option = topic.option();
295
296 int optionEncoded = option.retainHandling().value() << 4;
297 if (option.isRetainAsPublished()) {
298 optionEncoded |= 0x08;
299 }
300 if (option.isNoLocal()) {
301 optionEncoded |= 0x04;
302 }
303 optionEncoded |= option.qos().value();
304
305 buf.writeByte(optionEncoded);
306 }
307 }
308
309 return buf;
310 } finally {
311 propertiesBuf.release();
312 }
313 }
314
315 private static ByteBuf encodeUnsubscribeMessage(
316 ChannelHandlerContext ctx,
317 MqttUnsubscribeMessage message) {
318 MqttVersion mqttVersion = getMqttVersion(ctx);
319 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
320 ctx.alloc(),
321 message.idAndPropertiesVariableHeader().properties());
322
323 try {
324 final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
325 int payloadBufferSize = 0;
326
327 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
328 MqttMessageIdVariableHeader variableHeader = message.variableHeader();
329 MqttUnsubscribePayload payload = message.payload();
330
331 for (String topicName : payload.topics()) {
332 int topicNameBytes = utf8Bytes(topicName);
333 payloadBufferSize += 2 + topicNameBytes;
334 }
335
336 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
337 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
338
339 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
340 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
341 writeVariableLengthInt(buf, variablePartSize);
342
343
344 int messageId = variableHeader.messageId();
345 buf.writeShort(messageId);
346 buf.writeBytes(propertiesBuf);
347
348
349 for (String topicName : payload.topics()) {
350 writeUnsafeUTF8String(buf, topicName);
351 }
352
353 return buf;
354 } finally {
355 propertiesBuf.release();
356 }
357 }
358
359 private static ByteBuf encodeSubAckMessage(
360 ChannelHandlerContext ctx,
361 MqttSubAckMessage message) {
362 MqttVersion mqttVersion = getMqttVersion(ctx);
363 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
364 ctx.alloc(),
365 message.idAndPropertiesVariableHeader().properties());
366 try {
367 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
368 int payloadBufferSize = message.payload().grantedQoSLevels().size();
369 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
370 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
371 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
372 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
373 writeVariableLengthInt(buf, variablePartSize);
374 buf.writeShort(message.variableHeader().messageId());
375 buf.writeBytes(propertiesBuf);
376 for (int code: message.payload().reasonCodes()) {
377 buf.writeByte(code);
378 }
379
380 return buf;
381 } finally {
382 propertiesBuf.release();
383 }
384 }
385
386 private static ByteBuf encodeUnsubAckMessage(
387 ChannelHandlerContext ctx,
388 MqttUnsubAckMessage message) {
389 if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) {
390 MqttVersion mqttVersion = getMqttVersion(ctx);
391 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
392 ctx.alloc(),
393 message.idAndPropertiesVariableHeader().properties());
394 try {
395 int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
396 MqttUnsubAckPayload payload = message.payload();
397 int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
398 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
399 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
400 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
401 buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
402 writeVariableLengthInt(buf, variablePartSize);
403 buf.writeShort(message.variableHeader().messageId());
404 buf.writeBytes(propertiesBuf);
405
406 if (payload != null) {
407 for (Short reasonCode : payload.unsubscribeReasonCodes()) {
408 buf.writeByte(reasonCode);
409 }
410 }
411
412 return buf;
413 } finally {
414 propertiesBuf.release();
415 }
416 } else {
417 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
418 }
419 }
420
421 private static ByteBuf encodePublishMessage(
422 ChannelHandlerContext ctx,
423 MqttPublishMessage message) {
424 MqttVersion mqttVersion = getMqttVersion(ctx);
425 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
426 MqttPublishVariableHeader variableHeader = message.variableHeader();
427 ByteBuf payload = message.payload().duplicate();
428
429 String topicName = variableHeader.topicName();
430 int topicNameBytes = utf8Bytes(topicName);
431
432 ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion,
433 ctx.alloc(),
434 message.variableHeader().properties());
435
436 try {
437 int variableHeaderBufferSize = 2 + topicNameBytes +
438 (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes();
439 int payloadBufferSize = payload.readableBytes();
440 int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
441 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
442
443 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
444 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
445 writeVariableLengthInt(buf, variablePartSize);
446 writeExactUTF8String(buf, topicName, topicNameBytes);
447 if (mqttFixedHeader.qosLevel().value() > 0) {
448 buf.writeShort(variableHeader.packetId());
449 }
450 buf.writeBytes(propertiesBuf);
451 buf.writeBytes(payload);
452
453 return buf;
454 } finally {
455 propertiesBuf.release();
456 }
457 }
458
459 private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx,
460 MqttMessage message) {
461 if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) {
462 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
463 MqttPubReplyMessageVariableHeader variableHeader =
464 (MqttPubReplyMessageVariableHeader) message.variableHeader();
465 int msgId = variableHeader.messageId();
466
467 final ByteBuf propertiesBuf;
468 final boolean includeReasonCode;
469 final int variableHeaderBufferSize;
470 final MqttVersion mqttVersion = getMqttVersion(ctx);
471 if (mqttVersion == MqttVersion.MQTT_5 &&
472 (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK ||
473 !variableHeader.properties().isEmpty())) {
474 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
475 includeReasonCode = true;
476 variableHeaderBufferSize = 3 + propertiesBuf.readableBytes();
477 } else {
478 propertiesBuf = Unpooled.EMPTY_BUFFER;
479 includeReasonCode = false;
480 variableHeaderBufferSize = 2;
481 }
482
483 try {
484 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
485 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
486 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
487 writeVariableLengthInt(buf, variableHeaderBufferSize);
488 buf.writeShort(msgId);
489 if (includeReasonCode) {
490 buf.writeByte(variableHeader.reasonCode());
491 }
492 buf.writeBytes(propertiesBuf);
493
494 return buf;
495 } finally {
496 propertiesBuf.release();
497 }
498 } else {
499 return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message);
500 }
501 }
502
503 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
504 ByteBufAllocator byteBufAllocator,
505 MqttMessage message) {
506 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
507 MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
508 int msgId = variableHeader.messageId();
509
510 int variableHeaderBufferSize = 2;
511 int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
512 ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
513 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
514 writeVariableLengthInt(buf, variableHeaderBufferSize);
515 buf.writeShort(msgId);
516
517 return buf;
518 }
519
520 private static ByteBuf encodeReasonCodePlusPropertiesMessage(
521 ChannelHandlerContext ctx,
522 MqttMessage message) {
523 if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) {
524 MqttVersion mqttVersion = getMqttVersion(ctx);
525 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
526 MqttReasonCodeAndPropertiesVariableHeader variableHeader =
527 (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
528
529 final ByteBuf propertiesBuf;
530 final boolean includeReasonCode;
531 final int variableHeaderBufferSize;
532 if (mqttVersion == MqttVersion.MQTT_5 &&
533 (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK ||
534 !variableHeader.properties().isEmpty())) {
535 propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties());
536 includeReasonCode = true;
537 variableHeaderBufferSize = 1 + propertiesBuf.readableBytes();
538 } else {
539 propertiesBuf = Unpooled.EMPTY_BUFFER;
540 includeReasonCode = false;
541 variableHeaderBufferSize = 0;
542 }
543
544 try {
545 final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
546 ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
547 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
548 writeVariableLengthInt(buf, variableHeaderBufferSize);
549 if (includeReasonCode) {
550 buf.writeByte(variableHeader.reasonCode());
551 }
552 buf.writeBytes(propertiesBuf);
553
554 return buf;
555 } finally {
556 propertiesBuf.release();
557 }
558 } else {
559 return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message);
560 }
561 }
562
563 private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
564 ByteBufAllocator byteBufAllocator,
565 MqttMessage message) {
566 MqttFixedHeader mqttFixedHeader = message.fixedHeader();
567 ByteBuf buf = byteBufAllocator.buffer(2);
568 buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
569 buf.writeByte(0);
570
571 return buf;
572 }
573
574 private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion,
575 ByteBufAllocator byteBufAllocator,
576 MqttProperties mqttProperties) {
577 if (mqttVersion == MqttVersion.MQTT_5) {
578 return encodeProperties(byteBufAllocator, mqttProperties);
579 }
580 return Unpooled.EMPTY_BUFFER;
581 }
582
583 private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator,
584 MqttProperties mqttProperties) {
585 ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer();
586
587 try {
588 ByteBuf propertiesBuf = byteBufAllocator.buffer();
589 try {
590 for (MqttProperties.MqttProperty property : mqttProperties.listAll()) {
591 MqttProperties.MqttPropertyType propertyType =
592 MqttProperties.MqttPropertyType.valueOf(property.propertyId);
593 switch (propertyType) {
594 case PAYLOAD_FORMAT_INDICATOR:
595 case REQUEST_PROBLEM_INFORMATION:
596 case REQUEST_RESPONSE_INFORMATION:
597 case MAXIMUM_QOS:
598 case RETAIN_AVAILABLE:
599 case WILDCARD_SUBSCRIPTION_AVAILABLE:
600 case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
601 case SHARED_SUBSCRIPTION_AVAILABLE:
602 writeVariableLengthInt(propertiesBuf, property.propertyId);
603 final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue();
604 propertiesBuf.writeByte(bytePropValue);
605 break;
606 case SERVER_KEEP_ALIVE:
607 case RECEIVE_MAXIMUM:
608 case TOPIC_ALIAS_MAXIMUM:
609 case TOPIC_ALIAS:
610 writeVariableLengthInt(propertiesBuf, property.propertyId);
611 final short twoBytesInPropValue =
612 ((MqttProperties.IntegerProperty) property).value.shortValue();
613 propertiesBuf.writeShort(twoBytesInPropValue);
614 break;
615 case PUBLICATION_EXPIRY_INTERVAL:
616 case SESSION_EXPIRY_INTERVAL:
617 case WILL_DELAY_INTERVAL:
618 case MAXIMUM_PACKET_SIZE:
619 writeVariableLengthInt(propertiesBuf, property.propertyId);
620 final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value;
621 propertiesBuf.writeInt(fourBytesIntPropValue);
622 break;
623 case SUBSCRIPTION_IDENTIFIER:
624 writeVariableLengthInt(propertiesBuf, property.propertyId);
625 final int vbi = ((MqttProperties.IntegerProperty) property).value;
626 writeVariableLengthInt(propertiesBuf, vbi);
627 break;
628 case CONTENT_TYPE:
629 case RESPONSE_TOPIC:
630 case ASSIGNED_CLIENT_IDENTIFIER:
631 case AUTHENTICATION_METHOD:
632 case RESPONSE_INFORMATION:
633 case SERVER_REFERENCE:
634 case REASON_STRING:
635 writeVariableLengthInt(propertiesBuf, property.propertyId);
636 writeEagerUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value);
637 break;
638 case USER_PROPERTY:
639 final List<MqttProperties.StringPair> pairs =
640 ((MqttProperties.UserProperties) property).value;
641 for (MqttProperties.StringPair pair : pairs) {
642 writeVariableLengthInt(propertiesBuf, property.propertyId);
643 writeEagerUTF8String(propertiesBuf, pair.key);
644 writeEagerUTF8String(propertiesBuf, pair.value);
645 }
646 break;
647 case CORRELATION_DATA:
648 case AUTHENTICATION_DATA:
649 writeVariableLengthInt(propertiesBuf, property.propertyId);
650 final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value;
651 propertiesBuf.writeShort(binaryPropValue.length);
652 propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length);
653 break;
654 default:
655
656 throw new EncoderException("Unknown property type: " + propertyType);
657 }
658 }
659 writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes());
660 propertiesHeaderBuf.writeBytes(propertiesBuf);
661
662 return propertiesHeaderBuf;
663 } finally {
664 propertiesBuf.release();
665 }
666 } catch (RuntimeException e) {
667 propertiesHeaderBuf.release();
668 throw e;
669 }
670 }
671
672 private static int getFixedHeaderByte1(MqttFixedHeader header) {
673 int ret = 0;
674 ret |= header.messageType().value() << 4;
675 if (header.isDup()) {
676 ret |= 0x08;
677 }
678 ret |= header.qosLevel().value() << 1;
679 if (header.isRetain()) {
680 ret |= 0x01;
681 }
682 return ret;
683 }
684
685 private static void writeVariableLengthInt(ByteBuf buf, int num) {
686 do {
687 int digit = num % 128;
688 num /= 128;
689 if (num > 0) {
690 digit |= 0x80;
691 }
692 buf.writeByte(digit);
693 } while (num > 0);
694 }
695
696 private static int nullableUtf8Bytes(String s) {
697 return s == null? 0 : utf8Bytes(s);
698 }
699
700 private static int nullableMaxUtf8Bytes(String s) {
701 return s == null? 0 : utf8MaxBytes(s);
702 }
703
704 private static void writeExactUTF8String(ByteBuf buf, String s, int utf8Length) {
705 buf.ensureWritable(utf8Length + 2);
706 buf.writeShort(utf8Length);
707 if (utf8Length > 0) {
708 final int writtenUtf8Length = reserveAndWriteUtf8(buf, s, utf8Length);
709 assert writtenUtf8Length == utf8Length;
710 }
711 }
712
713 private static void writeEagerUTF8String(ByteBuf buf, String s) {
714 final int maxUtf8Length = nullableMaxUtf8Bytes(s);
715 buf.ensureWritable(maxUtf8Length + 2);
716 final int writerIndex = buf.writerIndex();
717 final int startUtf8String = writerIndex + 2;
718 buf.writerIndex(startUtf8String);
719 final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, maxUtf8Length) : 0;
720 buf.setShort(writerIndex, utf8Length);
721 }
722
723 private static void writeUnsafeUTF8String(ByteBuf buf, String s) {
724 final int writerIndex = buf.writerIndex();
725 final int startUtf8String = writerIndex + 2;
726
727 buf.writerIndex(startUtf8String);
728 final int utf8Length = s != null? reserveAndWriteUtf8(buf, s, 0) : 0;
729 buf.setShort(writerIndex, utf8Length);
730 }
731
732 private static int getVariableLengthInt(int num) {
733 int count = 0;
734 do {
735 num /= 128;
736 count++;
737 } while (num > 0);
738 return count;
739 }
740
741 }