Skip to content

Commit be4df2a

Browse files
committed
feat(mqtt): add mqtt version configuration in client options
- Add setMqttVersion method to MqttSessionImpl - Remove redundant setMqttVersion call in MqttClient - Change mqttVersion field access to protected in AbstractSession - Remove setMqttVersion method from AbstractSession - Update MqttSpecComplianceTest to use mqtt version in client options - Clean up lambda expressions in MqttSpecComplianceTest subscribe calls - Add blank lines for better code readability in tests
1 parent b1e6de4 commit be4df2a

File tree

4 files changed

+17
-17
lines changed

4 files changed

+17
-17
lines changed

smart-mqtt-broker/src/main/java/tech/smartboot/mqtt/broker/MqttSessionImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,4 +334,8 @@ public long getLatestReceiveMessageTime() {
334334
public void setLatestReceiveMessageTime(long latestReceiveMessageTime) {
335335
this.latestReceiveMessageTime = latestReceiveMessageTime;
336336
}
337+
338+
public final void setMqttVersion(MqttVersion mqttVersion) {
339+
this.mqttVersion = mqttVersion;
340+
}
337341
}

smart-mqtt-client/src/main/java/tech/smartboot/mqtt/client/MqttClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public MqttClient(String uri, Consumer<Options> opt) {
131131
options.setPort(MqttUtil.toInt(array[2]));
132132
opt.accept(options);
133133
this.clientId = options.getClientId();
134-
setMqttVersion(options.getMqttVersion());
134+
mqttVersion = options.getMqttVersion();
135135
}
136136

137137

smart-mqtt-common/src/main/java/tech/smartboot/mqtt/common/AbstractSession.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public abstract class AbstractSession {
3838
protected boolean disconnect = false;
3939
protected MqttWriter mqttWriter;
4040

41-
private MqttVersion mqttVersion;
41+
protected MqttVersion mqttVersion;
4242

4343
protected InflightQueue inflightQueue;
4444
private final Hashtable<Integer, MqttPublishMessage> ackMessageCacheMap = new Hashtable<>();
@@ -121,10 +121,6 @@ public final MqttVersion getMqttVersion() {
121121
return mqttVersion;
122122
}
123123

124-
public final void setMqttVersion(MqttVersion mqttVersion) {
125-
this.mqttVersion = mqttVersion;
126-
}
127-
128124
public final void setInflightQueue(InflightQueue inflightQueue) {
129125
this.inflightQueue = inflightQueue;
130126
}

smart-mqtt-test/src/test/java/org/smartboot/mqtt/test/MqttSpecComplianceTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,7 @@ public void testSharedSubscription_MQTT5() throws InterruptedException, Executio
388388
int numSubscribers = 3;
389389
int numMessages = numSubscribers * 2; // Send enough messages to likely hit all subscribers
390390

391-
MqttClient publisher = new MqttClient(host, port);
392-
publisher.setMqttVersion(MqttVersion.MQTT_5);
391+
MqttClient publisher = new MqttClient(host, port, options -> options.setMqttVersion(MqttVersion.MQTT_5));
393392
CompletableFuture<Void> pubConnectFuture = new CompletableFuture<>();
394393
publisher.connect(connAck -> {
395394
if (connAck.getVariableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
@@ -407,9 +406,8 @@ public void testSharedSubscription_MQTT5() throws InterruptedException, Executio
407406
for (int i = 0; i < numSubscribers; i++) {
408407
int j = i;
409408
MqttClient subscriber = new MqttClient(host, port, options -> {
410-
options.setClientId("shared-sub-" + j);
409+
options.setClientId("shared-sub-" + j).setMqttVersion(MqttVersion.MQTT_5);
411410
});
412-
subscriber.setMqttVersion(MqttVersion.MQTT_5);
413411
subscribers.add(subscriber);
414412
CompletableFuture<String> messageFuture = new CompletableFuture<>();
415413
messageFutures.add(messageFuture); // We expect each subscriber to get at least one message
@@ -631,7 +629,8 @@ public void testTopicWildcard_InvalidSubscription() throws InterruptedException,
631629

632630
// Test wildcard in the middle of a topic level (invalid)
633631
CompletableFuture<Boolean> invalidWildcardFuture = new CompletableFuture<>();
634-
subscriber.subscribe("sport/+/tennis", MqttQoS.AT_MOST_ONCE, (client, msg) -> {}, (client, qos) -> {
632+
subscriber.subscribe("sport/+/tennis", MqttQoS.AT_MOST_ONCE, (client, msg) -> {
633+
}, (client, qos) -> {
635634
// This subscription should be rejected or handled properly
636635
invalidWildcardFuture.complete(true);
637636
});
@@ -645,7 +644,8 @@ public void testTopicWildcard_InvalidSubscription() throws InterruptedException,
645644

646645
// Test hash wildcard not at the end (invalid)
647646
CompletableFuture<Boolean> invalidHashFuture = new CompletableFuture<>();
648-
subscriber.subscribe("sport/#/tennis", MqttQoS.AT_MOST_ONCE, (client, msg) -> {}, (client, qos) -> {
647+
subscriber.subscribe("sport/#/tennis", MqttQoS.AT_MOST_ONCE, (client, msg) -> {
648+
}, (client, qos) -> {
649649
invalidHashFuture.complete(true);
650650
});
651651

@@ -667,7 +667,7 @@ public void testTopicWildcard_MixedSubscriptions() throws InterruptedException,
667667
// Test subscribing to both specific topic and wildcard topic
668668
String specificTopic = "sport/tennis/results";
669669
String wildcardTopic = "sport/tennis/+";
670-
670+
671671
String matchingTopic1 = "sport/tennis/results";
672672
String matchingTopic2 = "sport/tennis/scores";
673673

@@ -687,7 +687,7 @@ public void testTopicWildcard_MixedSubscriptions() throws InterruptedException,
687687
specificMessageFuture.complete(payload);
688688
}
689689
});
690-
690+
691691
// Subscribe to wildcard topic
692692
subscriber.subscribe(wildcardTopic, MqttQoS.AT_MOST_ONCE, (client, msg) -> {
693693
String topic = msg.getVariableHeader().getTopicName();
@@ -697,7 +697,7 @@ public void testTopicWildcard_MixedSubscriptions() throws InterruptedException,
697697
}
698698
});
699699
});
700-
700+
701701
Thread.sleep(500); // Allow subscriptions to complete
702702

703703
// Publish to matching topics
@@ -736,11 +736,11 @@ public void testTopicWildcard_EmptyLevel() throws InterruptedException, Executio
736736
}
737737
});
738738
});
739-
739+
740740
Thread.sleep(500); // Allow subscription to complete
741741

742742
publisher.publish(matchingTopic, MqttQoS.AT_MOST_ONCE, "empty_level_payload".getBytes());
743-
743+
744744
// Check if message with empty level is received
745745
try {
746746
Assert.assertEquals("empty_level_payload", messageFuture.get(5, TimeUnit.SECONDS));

0 commit comments

Comments
 (0)