Skip to content

Conversation

@shibd
Copy link
Member

@shibd shibd commented Feb 28, 2025

Motivation

When using a listener to create a consumer with multiple topics, messages might be lost.

After #219, when a message reaches a listener, if the consumer is null, it will ignore this msg, and then msg will be lost.

// `consumer` might be null in certain cases, segmentation fault might happend without this null check. We
// need to handle this rare case in future.
if (consumer) {
Napi::Value ret;
try {

Question: Why is the consumer null here?

In CPP implementation, after apache/pulsar-client-cpp#447, we will make sure all sub-consumers create success and then resumeListenerDispacther.

And it will call create-callback first
https://github.com/apache/pulsar-client-cpp/blob/54e529aaf82bddac063c847d4c11d3fba3acf0f3/lib/MultiTopicsConsumerImpl.cc#L151-L156

 multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr()); // Now all child topics are successfully subscribed, start messageListeners if (messageListener_ && !conf_.isStartPaused()) { LOG_INFO("Start messageListeners"); resumeMessageListener(); } 

And, when callback reach to Node.js lib. will execute code: this code will set consumer object.

deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) {
Napi::Object obj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(obj);
consumer->SetCConsumer(cConsumer);
consumer->SetListenerCallback(listener);
if (listener) {
// resume to enable MessageListener function callback
resume_message_listener(cConsumer.get());
}
return obj;
});

However, since the consumer is set in deferred->Resolve, which is asynchronous, it might not be fully set up when the listener starts receiving messages. Therefore, in the code above, the consumer might be null, and the message could ultimately be ignored.

Modifications

  • Use consumer future to make sure get consumer point after setting it.

Verifying this change

This issue to hard use unit test to cover it.

There is reproduction step:

  1. Create a topic with 5 partition.
  2. Create subscription
  3. Change pulsar source code liks below. (Slow down the subscription to one of the topics to delay the overall completion of the consumer subscription.)
 protected void handleSubscribe(final CommandSubscribe subscribe) { checkArgument(state == State.Connected); final long requestId = subscribe.getRequestId(); final long consumerId = subscribe.getConsumerId(); TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe); + if (!subscribe.getTopic().equals("persistent://public/default/test-listener2-partition-0")) { + try { + log.info("test sleep 5000ms: " + subscribe.getTopic()); + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }
  1. Send 2 msg to partition-0
  2. Create a consumer with a listener.
 // Create a consumer with listener const consumer = await client.subscribe({ topic: 'persistent://public/default/test-listener2', subscription: 'sub1', subscriptionType: 'Shared', ackTimeoutMs: 100000000, listener: (msg, msgConsumer) => { console.log("Receive-msg: " + msg.getData().toString()); }, }); 

When running repeatedly, you'll encounter receiving only one message or no messages approximately every 2 to 3 times.

!!Note: After this PR, running it 100 times repeatedly no longer results in any issues.

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@shibd shibd added this to the 1.13.0 milestone Feb 28, 2025
@shibd shibd self-assigned this Feb 28, 2025
@shibd shibd merged commit b17a2e1 into apache:master Mar 2, 2025
12 checks passed
shibd added a commit that referenced this pull request Mar 2, 2025
* Fix message might lost when use listener * Remove todo * code format * Remove consumer if null judge (cherry picked from commit b17a2e1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants