Skip to content

Commit b1e6de4

Browse files
committed
refactor(mqtt): update decode methods to include MQTT version parameter
- Modified decodeVariableHeader0 methods across multiple MQTT message classes to accept MqttVersion - Removed deprecated setVersion method from MqttMessage class - Updated MqttVariableMessage and OnlyFixedHeaderMessage to pass MqttVersion during decoding - Adjusted AbstractSession and MqttHeaderDecoder to remove obsolete version setting logic - Ensured consistent handling of MQTT protocol versions during message decoding - Added missing import for MqttVersion in relevant classes
1 parent f2142cb commit b1e6de4

15 files changed

+19
-22
lines changed

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/AbstractSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public final void notifyPubComp(int packetId) {
7272
public void write(MqttMessage mqttMessage, boolean autoFlush) {
7373
ValidateUtils.isTrue(!disconnect, "已断开连接,无法发送消息");
7474
try {
75-
mqttMessage.setVersion(mqttVersion);
7675
synchronized (mqttWriter) {
7776
if (disconnect) {
7877
ValidateUtils.throwException("session is disconnect");

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/MqttHeaderDecoder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ public Decoder decode(ByteBuffer buffer, AbstractSession session) {
8383

8484
MqttFixedHeader mqttFixedHeader = MqttFixedHeader.getInstance(messageType, dupFlag, qosLevel, retain);
8585
session.mqttMessage = newMessage(mqttFixedHeader);
86-
session.mqttMessage.setVersion(session.getMqttVersion());
8786
session.mqttMessage.setRemainingLength(remainingLength);
8887
return mqttPayloadDecoder.decode(buffer, session);
8988
}

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/MqttPayloadDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public Decoder decode(ByteBuffer buffer, AbstractSession session) {
4646
return this;
4747
}
4848
int p = payloadBuffer.position();
49-
session.mqttMessage.decodeVariableHeader(payloadBuffer);
49+
session.mqttMessage.decodeVariableHeader(payloadBuffer, session.getMqttVersion());
5050
session.mqttMessage.decodePlayLoad(payloadBuffer);
5151
ValidateUtils.isTrue((payloadBuffer.position() - p) == remainingLength, "Payload size is wrong");
5252
session.disposableBuffer = null;

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttConnAckMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public MqttConnAckMessage(MqttConnAckVariableHeader mqttConnAckVariableHeader) {
3434
}
3535

3636
@Override
37-
public void decodeVariableHeader0(ByteBuffer buffer) {
37+
public void decodeVariableHeader0(ByteBuffer buffer, MqttVersion version) {
3838
final boolean sessionPresent = (buffer.get() & 0x01) == 0x01;
3939
byte returnCode = buffer.get();
4040

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttConnectMessage.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class MqttConnectMessage extends MqttVariableMessage<MqttConnectVariableH
3333
* 有效载荷
3434
*/
3535
private MqttConnectPayload mqttConnectPayload;
36+
private MqttVersion version;
3637

3738
public MqttConnectMessage(MqttFixedHeader mqttFixedHeader) {
3839
super(mqttFixedHeader);
@@ -45,7 +46,7 @@ public MqttConnectMessage(MqttConnectVariableHeader mqttConnectVariableHeader, M
4546
}
4647

4748
@Override
48-
public void decodeVariableHeader0(ByteBuffer buffer) {
49+
public void decodeVariableHeader0(ByteBuffer buffer, MqttVersion mqttVersion) {
4950
//协议名
5051
//协议名是表示协议名 MQTT 的 UTF-8 编码的字符串。
5152
//MQTT 规范的后续版本不会改变这个字符串的偏移和长度。

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttDisconnectMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public MqttDisconnectMessage(MqttDisconnectVariableHeader mqttConnAckVariableHea
2929
}
3030

3131
@Override
32-
protected void decodeVariableHeader0(ByteBuffer buffer) {
32+
protected void decodeVariableHeader0(ByteBuffer buffer, final MqttVersion version) {
3333
if (version == MqttVersion.MQTT_5) {
3434
byte returnCode = buffer.get();
3535
DisConnectProperties properties = new DisConnectProperties();

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttMessage.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ protected void writeTo(MqttWriter mqttWriter) {
4040
* 固定报头
4141
*/
4242
protected final MqttFixedHeader fixedHeader;
43-
protected MqttVersion version;
4443
/**
4544
* 剩余长度
4645
*/
@@ -59,7 +58,7 @@ public final MqttFixedHeader getFixedHeader() {
5958
*
6059
* @param buffer
6160
*/
62-
public abstract void decodeVariableHeader(ByteBuffer buffer);
61+
public abstract void decodeVariableHeader(ByteBuffer buffer, MqttVersion mqttVersion);
6362

6463
public void decodePlayLoad(ByteBuffer buffer) {
6564

@@ -110,8 +109,4 @@ public int getRemainingLength() {
110109
public void setRemainingLength(int remainingLength) {
111110
this.remainingLength = remainingLength;
112111
}
113-
114-
public final void setVersion(MqttVersion version) {
115-
this.version = version;
116-
}
117112
}

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttPubQosMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public MqttPubQosMessage(MqttFixedHeader pubRecHeader, MqttPubQosVariableHeader
3232
}
3333

3434
@Override
35-
protected final void decodeVariableHeader0(ByteBuffer buffer) {
35+
protected final void decodeVariableHeader0(ByteBuffer buffer, MqttVersion version) {
3636
int packetId = decodeMessageId(buffer);
3737
MqttPubQosVariableHeader header = null;
3838
if (version == MqttVersion.MQTT_5) {

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttPublishMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public MqttPublishMessage(MqttFixedHeader mqttFixedHeader, MqttPublishVariableHe
3737
}
3838

3939
@Override
40-
public void decodeVariableHeader0(ByteBuffer buffer) {
40+
public void decodeVariableHeader0(ByteBuffer buffer, final MqttVersion version) {
4141
final String topic = TopicByteTree.DEFAULT_INSTANCE.search(buffer);
4242
int packetId = -1;
4343
//只有当 QoS 等级是 1 或 2 时,报文标识符(Packet Identifier)字段才能出现在 PUBLISH 报文中。

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/message/MqttSubAckMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public MqttSubAckMessage(MqttReasonVariableHeader variableHeader) {
3535
}
3636

3737
@Override
38-
protected void decodeVariableHeader0(ByteBuffer buffer) {
38+
protected void decodeVariableHeader0(ByteBuffer buffer, final MqttVersion version) {
3939
int packetId = decodeMessageId(buffer);
4040
MqttReasonVariableHeader header;
4141
if (version == MqttVersion.MQTT_5) {

0 commit comments

Comments
 (0)