Skip to content

Commit ccf9730

Browse files
authored
[ISSUE #9789] LitePullConsumer supports manually adding subscription reported in Heartbeat (#9790)
1 parent a68a5bf commit ccf9730

File tree

3 files changed

+29
-9
lines changed

3 files changed

+29
-9
lines changed

client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.rocketmq.client.consumer;
1818

1919
import java.util.Collection;
20+
import java.util.HashSet;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Set;
@@ -33,11 +34,13 @@
3334
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
3435
import org.apache.rocketmq.common.message.MessageExt;
3536
import org.apache.rocketmq.common.message.MessageQueue;
37+
import org.apache.rocketmq.logging.org.slf4j.Logger;
38+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3639
import org.apache.rocketmq.remoting.RPCHook;
3740
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
41+
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
3842
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
39-
import org.apache.rocketmq.logging.org.slf4j.Logger;
40-
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
43+
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
4144

4245
import static org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData.SUB_ALL;
4346

@@ -171,6 +174,8 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
171174

172175
private RPCHook rpcHook;
173176

177+
private final Set<SubscriptionData> subscriptionsForHeartbeat = new HashSet<>();
178+
174179
/**
175180
* Default constructor.
176181
*/
@@ -618,4 +623,17 @@ public boolean isEnableMsgTrace() {
618623
public void setEnableMsgTrace(boolean enableMsgTrace) {
619624
this.enableTrace = enableMsgTrace;
620625
}
626+
627+
public Set<SubscriptionData> getSubscriptionsForHeartbeat() {
628+
return this.subscriptionsForHeartbeat;
629+
}
630+
631+
public synchronized void buildSubscriptionsForHeartbeat(Map<String, MessageSelector> messageSelectorMap) throws Exception {
632+
this.subscriptionsForHeartbeat.clear();
633+
for (Map.Entry<String, MessageSelector> entry : messageSelectorMap.entrySet()) {
634+
SubscriptionData subscriptionData = FilterAPI.build(entry.getKey(),
635+
entry.getValue().getExpression(), entry.getValue().getExpressionType());
636+
this.subscriptionsForHeartbeat.add(subscriptionData);
637+
}
638+
}
621639
}

client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*/
1717
package org.apache.rocketmq.client.consumer;
1818

19-
import org.apache.rocketmq.client.exception.MQClientException;
20-
import org.apache.rocketmq.common.message.MessageExt;
21-
import org.apache.rocketmq.common.message.MessageQueue;
22-
2319
import java.util.Collection;
2420
import java.util.List;
2521
import java.util.Map;
2622
import java.util.Set;
23+
import org.apache.rocketmq.client.exception.MQClientException;
24+
import org.apache.rocketmq.common.message.MessageExt;
25+
import org.apache.rocketmq.common.message.MessageQueue;
2726

2827
public interface LitePullConsumer {
2928

@@ -107,6 +106,8 @@ public interface LitePullConsumer {
107106
*/
108107
void setSubExpressionForAssign(final String topic, final String subExpression);
109108

109+
void buildSubscriptionsForHeartbeat(Map<String, MessageSelector> subExpressionMap) throws Exception;
110+
110111
/**
111112
* Fetch data for the topics or partitions specified using assign API
112113
*

client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import org.apache.rocketmq.common.message.MessageExt;
6464
import org.apache.rocketmq.common.message.MessageQueue;
6565
import org.apache.rocketmq.common.sysflag.PullSysFlag;
66+
import org.apache.rocketmq.logging.org.slf4j.Logger;
67+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
6668
import org.apache.rocketmq.remoting.RPCHook;
6769
import org.apache.rocketmq.remoting.exception.RemotingException;
6870
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
@@ -73,8 +75,6 @@
7375
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
7476
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
7577
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
76-
import org.apache.rocketmq.logging.org.slf4j.Logger;
77-
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
7878

7979
public class DefaultLitePullConsumerImpl implements MQConsumerInner {
8080

@@ -1122,7 +1122,8 @@ public Set<SubscriptionData> subscriptions() {
11221122
Set<SubscriptionData> subSet = new HashSet<>();
11231123

11241124
subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
1125-
1125+
subSet.addAll(this.defaultLitePullConsumer.getSubscriptionsForHeartbeat());
1126+
11261127
return subSet;
11271128
}
11281129

0 commit comments

Comments
 (0)