Fix message might lost when use listener #406
Merged
Add this suggestion to a batch that can be applied as a single commit. This suggestion is invalid because no changes were made to the code. Suggestions cannot be applied while the pull request is closed. Suggestions cannot be applied while viewing a subset of changes. Only one suggestion per line can be applied in a batch. Add this suggestion to a batch that can be applied as a single commit. Applying suggestions on deleted lines is not supported. You must change the existing code in this line in order to create a valid suggestion. Outdated suggestions cannot be applied. This suggestion has been applied or marked resolved. Suggestions cannot be applied from pending reviews. Suggestions cannot be applied on multi-line comments. Suggestions cannot be applied while the pull request is queued to merge. Suggestion cannot be applied right now. Please check back later.
Motivation
When using a listener to create a consumer with multiple topics, messages might be lost.
After #219, when a message reaches a listener, if the consumer is null, it will ignore this msg, and then msg will be lost.
pulsar-client-node/src/Consumer.cc
Lines 80 to 84 in 36b154e
Question: Why is the consumer null here?
In CPP implementation, after apache/pulsar-client-cpp#447, we will make sure all sub-consumers create success and then resumeListenerDispacther.
And it will call
create-callbackfirsthttps://github.com/apache/pulsar-client-cpp/blob/54e529aaf82bddac063c847d4c11d3fba3acf0f3/lib/MultiTopicsConsumerImpl.cc#L151-L156
And, when callback reach to Node.js lib. will execute code: this code will set
consumer object.pulsar-client-node/src/Consumer.cc
Lines 176 to 189 in 36b154e
However, since the consumer is set in
deferred->Resolve, which is asynchronous, it might not be fully set up when the listener starts receiving messages. Therefore, in the code above, the consumer might benull, and the message could ultimately be ignored.Modifications
Verifying this change
This issue to hard use unit test to cover it.
There is reproduction step:
protected void handleSubscribe(final CommandSubscribe subscribe) { checkArgument(state == State.Connected); final long requestId = subscribe.getRequestId(); final long consumerId = subscribe.getConsumerId(); TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe); + if (!subscribe.getTopic().equals("persistent://public/default/test-listener2-partition-0")) { + try { + log.info("test sleep 5000ms: " + subscribe.getTopic()); + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }partition-0When running repeatedly, you'll encounter receiving only one message or no messages approximately every 2 to 3 times.
!!Note: After this PR, running it 100 times repeatedly no longer results in any issues.
Documentation
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)