Support custom router for producer #433
Closed
Add this suggestion to a batch that can be applied as a single commit. This suggestion is invalid because no changes were made to the code. Suggestions cannot be applied while the pull request is closed. Suggestions cannot be applied while viewing a subset of changes. Only one suggestion per line can be applied in a batch. Add this suggestion to a batch that can be applied as a single commit. Applying suggestions on deleted lines is not supported. You must change the existing code in this line in order to create a valid suggestion. Outdated suggestions cannot be applied. This suggestion has been applied or marked resolved. Suggestions cannot be applied from pending reviews. Suggestions cannot be applied on multi-line comments. Suggestions cannot be applied while the pull request is queued to merge. Suggestion cannot be applied right now. Please check back later.
Motivation
To support a custom router in the Node.js client, the most direct approach would be to call the user-provided router function from within the C++ callback. However, because the C++ client's
sendAsyncmethod has a synchronous component for partition selection before it becomes fully asynchronous, this implementation causes a deadlock between the Node.js main thread and the C++ callback thread. This is due to the single-threaded nature of the Node.js event loop.https://github.com/apache/pulsar-client-cpp/blob/90ea3695b80660f837785d98e96047d90de3f64f/lib/PartitionedProducerImpl.cc#L209-L220
You can understand by reviewing the PR and running the associated tests.
Modifications
This PR achieves the goal by calling the user's router function at the JS layer and then passing the resulting partition index to the C++ layer via message properties.
sendmethod inProducer.jsis wrapped. It now first calls the user's router function to asynchronously determine the target partition.propertiesunder the special key:__partition__.internalCppMessageRouter) is now used, which simply reads this property and returns the partition index to the underlying C++ client.Points for Discussion
This implementation introduces two points for discussion:
numPartitions) inProducer.js, which is then passed to the user's router function. (Currently, this cache has a 60-second TTL. A potential future improvement is to expose a method from the C++ producer to get the partition count directly. This would allow us to remove the JS-side cache, as the C++ client internally handles partition metadata updates.)__partition__) in the message properties, which could potentially be confusing for users. However, I believe this is not a major issue at the moment.Verifying this change
Message Routingtest case has been added to cover this change.Documentation
doc-required(Your PR needs to update docs and you will update later)
doc-not-needed(Please explain why)
doc(Your PR contains doc changes)
doc-complete(Docs have been already added)