Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface ProducerConfig {
schema?: SchemaInfo;
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
}

export class Producer {
Expand Down Expand Up @@ -176,6 +177,18 @@ export class MessageId {
toString(): string;
}

export interface TopicMetadata {
numPartitions: number;
}

/**
* A custom message router function that can be implemented by the user.
* @param message The message to be routed.
* @param topicMetadata Metadata for the topic.
* @returns The partition index to send the message to.
*/
export type MessageRouter = (message: ProducerMessage, topicMetadata: TopicMetadata) => number;

export interface SchemaInfo {
schemaType: SchemaType;
name?: string;
Expand Down
6 changes: 4 additions & 2 deletions src/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const fs = require('fs');
const tls = require('tls');
const os = require('os');
const PulsarBinding = require('./pulsar-binding');
const Producer = require('./Producer');

const certsFilePath = `${__dirname}/cert.pem`;

Expand All @@ -32,8 +33,9 @@ class Client {
this.client = new PulsarBinding.Client(params);
}

createProducer(params) {
return this.client.createProducer(params);
async createProducer(params) {
const addonProducer = await this.client.createProducer(params);
return new Producer(this, addonProducer, params);
}

subscribe(params) {
Expand Down
4 changes: 3 additions & 1 deletion src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
auto producerConfig = instanceContext->producerConfig;
delete instanceContext;

if (result != pulsar_result_Ok) {
Expand All @@ -81,10 +82,11 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt

std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);

deferred->Resolve([cProducer](const Napi::Env env) {
deferred->Resolve([cProducer, producerConfig](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
producer->producerConfig = producerConfig;
return obj;
});
},
Expand Down
2 changes: 2 additions & 0 deletions src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <napi.h>
#include <pulsar/c/client.h>
#include <pulsar/c/producer.h>
#include "ProducerConfig.h"

class Producer : public Napi::ObjectWrap<Producer> {
public:
Expand All @@ -35,6 +36,7 @@ class Producer : public Napi::ObjectWrap<Producer> {

private:
std::shared_ptr<pulsar_producer_t> cProducer;
std::shared_ptr<ProducerConfig> producerConfig;
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Expand Down
142 changes: 142 additions & 0 deletions src/Producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/**
* @typedef {import('../index').ProducerMessage} ProducerMessage
* @typedef {import('../index').MessageId} MessageId
* @typedef {import('../index').ProducerConfig} ProducerConfig
* @typedef {import('../index').Client} PulsarClient
* @typedef {import('../index').Producer} AddonProducer
*/

const PARTITION_PROP_KEY = '__partition__';
const CACHE_TTL_MS = 60 * 1000;

class Producer {
/**
* This class is a JavaScript wrapper around the C++ N-API Producer object.
* It should not be instantiated by users directly.
* @param {PulsarClient} client
* @param {Producer.h} addonProducer - The native addon producer instance.
* @param {ProducerConfig} config - The original producer configuration object.
*/
constructor(client, addonProducer, config) {
/** @private */
this.client = client;
/** @private */
this.addonProducer = addonProducer;
/** @private */
this.producerConfig = config;
/** @private */
this.numPartitions = undefined;
/** @private */
this.partitionsCacheTimestamp = 0;
}

/**
* Sends a message. If a custom message router was provided, it is called first
* to determine the partition before passing the message to the C++ addon.
* @param {ProducerMessage} message - The message object to send.
* @returns {Promise<MessageId>} A promise that resolves with the MessageId of the sent message.
*/
async send(message) {
// 1. Create a shallow copy of the message parameter at the beginning.
const finalMessage = { ...message };
const config = this.producerConfig;

// Check if custom routing mode is enabled
if (config.messageRoutingMode === 'CustomPartition') {
if (typeof config.messageRouter === 'function') {
const numPartitions = await this.getNumPartitions();
const topicMetadata = { numPartitions };
const partitionIndex = config.messageRouter(finalMessage, topicMetadata);
if (typeof partitionIndex === 'number' && partitionIndex >= 0) {
if (!finalMessage.properties) {
finalMessage.properties = {};
}
finalMessage.properties[PARTITION_PROP_KEY] = String(partitionIndex);
}
} else {
throw new Error("Producer is configured with 'CustomPartition' routing mode, "
+ "but a 'messageRouter' function was not provided.");
}
}

// 3. Pass the modified copy to the C++ addon.
return this.addonProducer.send(finalMessage);
}

/**
* Gets the number of partitions for the topic, using a cache with a TTL.
* @private
* @returns {Promise<number>}
*/
async getNumPartitions() {
const now = Date.now();
// Check if cache is missing or expired
if (this.numPartitions === undefined || now > this.partitionsCacheTimestamp + CACHE_TTL_MS) {
const partitions = await this.client.getPartitionsForTopic(this.getTopic());
this.numPartitions = partitions.length;
this.partitionsCacheTimestamp = now;
}
return this.numPartitions;
}

/**
* Flushes all the messages buffered in the client.
* @returns {Promise<null>}
*/
async flush() {
return this.addonProducer.flush();
}

/**
* Closes the producer.
* @returns {Promise<null>}
*/
async close() {
return this.addonProducer.close();
}

/**
* Gets the producer name.
* @returns {string}
*/
getProducerName() {
return this.addonProducer.getProducerName();
}

/**
* Gets the topic name.
* @returns {string}
*/
getTopic() {
return this.addonProducer.getTopic();
}

/**
* Checks if the producer is connected.
* @returns {boolean}
*/
isConnected() {
return this.addonProducer.isConnected();
}
}

module.exports = Producer;
29 changes: 26 additions & 3 deletions src/ProducerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,25 @@ static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUC
{"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
};

ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
// Define a special key that the JS layer will use to pass the partition index.
static const std::string PARTITION_PROP_KEY = "__partition__";

static int internalCppMessageRouter(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata,
void *ctx) {
int numPartitions = pulsar_topic_metadata_get_num_partitions(topicMetadata);
if (pulsar_message_has_property(msg, PARTITION_PROP_KEY.c_str())) {
const char *partitionStr = pulsar_message_get_property(msg, PARTITION_PROP_KEY.c_str());
try {
return std::stoi(partitionStr);
} catch (...) {
return numPartitions;
}
}
// return numPartitions to make cpp client failed callback
return numPartitions;
}

ProducerConfig::ProducerConfig(const Napi::Object &producerConfig) : topic("") {
this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
pulsar_producer_configuration_create(), pulsar_producer_configuration_free);

Expand Down Expand Up @@ -133,9 +151,14 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {

if (producerConfig.Has(CFG_ROUTING_MODE) && producerConfig.Get(CFG_ROUTING_MODE).IsString()) {
std::string messageRoutingMode = producerConfig.Get(CFG_ROUTING_MODE).ToString().Utf8Value();
if (MESSAGE_ROUTING_MODE.count(messageRoutingMode))
if (MESSAGE_ROUTING_MODE.count(messageRoutingMode)) {
pulsar_producer_configuration_set_partitions_routing_mode(this->cProducerConfig.get(),
MESSAGE_ROUTING_MODE.at(messageRoutingMode));
}
if (messageRoutingMode == "CustomPartition") {
pulsar_producer_configuration_set_message_router(this->cProducerConfig.get(), internalCppMessageRouter,
nullptr);
}
}

if (producerConfig.Has(CFG_HASH_SCHEME) && producerConfig.Get(CFG_HASH_SCHEME).IsString()) {
Expand Down Expand Up @@ -174,7 +197,7 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
}

if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) {
SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
SchemaInfo *schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
schemaInfo->SetProducerSchema(this->cProducerConfig);
delete schemaInfo;
}
Expand Down
Loading
Loading