Skip to content

Commit 4572352

Browse files
authored
Merge branch 'apache:master' into master
2 parents 033e186 + 6c01e36 commit 4572352

File tree

6 files changed

+216
-2
lines changed

6 files changed

+216
-2
lines changed

.npmignore

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,40 @@
1-
tests
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
219
.github
20+
build-support
21+
docs
22+
examples
23+
perf
24+
pkg
25+
tests
26+
# linting
27+
.clang-format
28+
.eslintignore
29+
.eslintrc.json
30+
# ts
31+
tsconfig.json
32+
tslint.json
33+
tstest.ts
34+
typedoc.json
35+
36+
# license
37+
license-checker-config.json
38+
license-header.txt
39+
40+

index.d.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ export interface ConsumerConfig {
103103
regexSubscriptionMode?: RegexSubscriptionMode;
104104
deadLetterPolicy?: DeadLetterPolicy;
105105
batchReceivePolicy?: ConsumerBatchReceivePolicy;
106+
keySharedPolicy?: KeySharedPolicy;
106107
}
107108

108109
export class Consumer {
@@ -194,6 +195,18 @@ export interface ConsumerBatchReceivePolicy {
194195
timeoutMs?: number;
195196
}
196197

198+
export interface ConsumerKeyShareStickyRange {
199+
start: number;
200+
end: number;
201+
}
202+
export type ConsumerKeyShareStickyRanges = ConsumerKeyShareStickyRange[];
203+
204+
export interface KeySharedPolicy {
205+
keyShareMode?: ConsumerKeyShareMode;
206+
allowOutOfOrderDelivery?: boolean;
207+
stickyRanges?: ConsumerKeyShareStickyRanges;
208+
}
209+
197210
export class AuthenticationTls {
198211
constructor(params: { certificatePath: string, privateKeyPath: string });
199212
}
@@ -296,6 +309,10 @@ export type ConsumerCryptoFailureAction =
296309
'DISCARD' |
297310
'CONSUME';
298311

312+
export type ConsumerKeyShareMode =
313+
'AutoSplit' |
314+
'Sticky';
315+
299316
export type RegexSubscriptionMode =
300317
'PersistentOnly' |
301318
'NonPersistentOnly' |

src/Client.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
144144
if (clientConfig.Has(CFG_AUTH) && clientConfig.Get(CFG_AUTH).IsObject()) {
145145
Napi::Object obj = clientConfig.Get(CFG_AUTH).ToObject();
146146
if (obj.Has(CFG_AUTH_PROP) && obj.Get(CFG_AUTH_PROP).IsObject()) {
147-
Authentication *auth = Authentication::Unwrap(obj.Get(CFG_AUTH_PROP).ToObject());
147+
this->authRef_ = Napi::Persistent(obj.Get(CFG_AUTH_PROP).As<Napi::Object>());
148+
Authentication *auth = Authentication::Unwrap(this->authRef_.Value());
148149
pulsar_client_configuration_set_auth(cClientConfig.get(), auth->GetCAuthentication());
149150
}
150151
}

src/Client.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class Client : public Napi::ObjectWrap<Client> {
5454
std::shared_ptr<pulsar_client_t> cClient;
5555
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;
5656
pulsar_logger_level_t logLevel = pulsar_logger_level_t::pulsar_INFO;
57+
Napi::ObjectReference authRef_;
5758

5859
Napi::Value CreateProducer(const Napi::CallbackInfo &info);
5960
Napi::Value Subscribe(const Napi::CallbackInfo &info);

src/ConsumerConfig.cc

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "Consumer.h"
2222
#include "SchemaInfo.h"
2323
#include "Message.h"
24+
#include "pulsar/ConsumerConfiguration.h"
2425
#include <pulsar/c/consumer_configuration.h>
2526
#include <pulsar/c/consumer.h>
2627
#include <map>
@@ -55,6 +56,10 @@ static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy";
5556
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages";
5657
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes";
5758
static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs";
59+
static const std::string CFG_KEY_SHARED_POLICY = "keySharedPolicy";
60+
static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode";
61+
static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = "allowOutOfOrderDelivery";
62+
static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges";
5863

5964
static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
6065
{"Exclusive", pulsar_ConsumerExclusive},
@@ -76,6 +81,15 @@ static const std::map<std::string, pulsar_consumer_crypto_failure_action> CONSUM
7681
{"CONSUME", pulsar_ConsumerConsume},
7782
};
7883

84+
static const std::map<std::string, pulsar::KeySharedMode> CONSUMER_KEY_SHARED_POLICY_MODE = {
85+
{"AutoSplit", pulsar::KeySharedMode::AUTO_SPLIT},
86+
{"Sticky", pulsar::KeySharedMode::STICKY},
87+
};
88+
89+
struct _pulsar_consumer_configuration {
90+
pulsar::ConsumerConfiguration consumerConfiguration;
91+
};
92+
7993
void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; }
8094

8195
ConsumerConfig::ConsumerConfig()
@@ -324,6 +338,58 @@ void ConsumerConfig::InitConfig(std::shared_ptr<ThreadSafeDeferred> deferred,
324338
return;
325339
}
326340
}
341+
342+
if (consumerConfig.Has(CFG_KEY_SHARED_POLICY) && consumerConfig.Get(CFG_KEY_SHARED_POLICY).IsObject()) {
343+
Napi::Object propObj = consumerConfig.Get(CFG_KEY_SHARED_POLICY).ToObject();
344+
pulsar::KeySharedPolicy cppKeySharedPolicy;
345+
346+
if (propObj.Has(CFG_KEY_SHARED_POLICY_MODE) && propObj.Get(CFG_KEY_SHARED_POLICY_MODE).IsString()) {
347+
std::string keyShareModeStr = propObj.Get(CFG_KEY_SHARED_POLICY_MODE).ToString().Utf8Value();
348+
if (CONSUMER_KEY_SHARED_POLICY_MODE.count(keyShareModeStr)) {
349+
cppKeySharedPolicy.setKeySharedMode(CONSUMER_KEY_SHARED_POLICY_MODE.at(keyShareModeStr));
350+
}
351+
}
352+
353+
if (propObj.Has(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER) &&
354+
propObj.Get(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER).IsBoolean()) {
355+
bool allowOutOfOrderDelivery = propObj.Get(CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER).ToBoolean();
356+
cppKeySharedPolicy.setAllowOutOfOrderDelivery(allowOutOfOrderDelivery);
357+
}
358+
359+
if (propObj.Has(CFG_KEY_SHARED_POLICY_STICKY_RANGES) &&
360+
propObj.Get(CFG_KEY_SHARED_POLICY_STICKY_RANGES).IsArray()) {
361+
Napi::Array rangesArray = propObj.Get(CFG_KEY_SHARED_POLICY_STICKY_RANGES).As<Napi::Array>();
362+
pulsar::StickyRanges stickyRanges;
363+
for (uint32_t i = 0; i < rangesArray.Length(); i++) {
364+
if (rangesArray.Get(i).IsObject()) {
365+
Napi::Object rangeObj = rangesArray.Get(i).ToObject();
366+
if (rangeObj.Has("start") && rangeObj.Has("end") && rangeObj.Get("start").IsNumber() &&
367+
rangeObj.Get("end").IsNumber()) {
368+
int start = rangeObj.Get("start").ToNumber().Int32Value();
369+
int end = rangeObj.Get("end").ToNumber().Int32Value();
370+
if (start > end) {
371+
std::string error = "Invalid sticky range at index " + std::to_string(i) + ": start (" +
372+
std::to_string(start) + ") > end (" + std::to_string(end) + ")";
373+
deferred->Reject(error);
374+
return;
375+
}
376+
stickyRanges.emplace_back(start, end);
377+
} else {
378+
std::string error = "Invalid sticky range format at index " + std::to_string(i) +
379+
": missing 'start'/'end' or invalid type, should be number type";
380+
deferred->Reject(error);
381+
return;
382+
}
383+
} else {
384+
std::string error = "Sticky range element at index " + std::to_string(i) + " is not an object";
385+
deferred->Reject(error);
386+
return;
387+
}
388+
}
389+
cppKeySharedPolicy.setStickyRanges(stickyRanges);
390+
}
391+
this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy);
392+
}
327393
}
328394

329395
ConsumerConfig::~ConsumerConfig() {

tests/consumer.test.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,97 @@ const Pulsar = require('../index');
429429
await producer.close();
430430
await consumer.close();
431431
});
432+
test('testStickyConsumer', async () => {
433+
const topicName = `KeySharedPolicyTest-sticky-consumer-${Date.now()}`;
434+
const subName = 'SubscriptionName';
435+
const numMessages = 1000;
436+
const numConsumers = 3;
437+
438+
// Create producer with round-robin routing
439+
const producer = await client.createProducer({
440+
topic: topicName,
441+
batchingEnabled: false,
442+
messageRoutingMode: 'RoundRobinDistribution',
443+
});
444+
445+
// Create 3 consumers with different hash ranges
446+
const consumers = [];
447+
const stickyRanges = [
448+
{ start: 0, end: 1000 }, // let consumer 1 handle small range
449+
{ start: 1001, end: 30000 },
450+
{ start: 30001, end: 65535 },
451+
];
452+
let i = 0;
453+
while (i < numConsumers) {
454+
const consumer = await client.subscribe({
455+
topic: topicName,
456+
subscription: subName,
457+
subscriptionType: 'KeyShared',
458+
keySharedPolicy: {
459+
keyShareMode: 'Sticky',
460+
stickyRanges: [stickyRanges[i]],
461+
},
462+
});
463+
consumers.push(consumer);
464+
i += 1;
465+
}
466+
467+
// Send messages with random keys
468+
const keys = Array.from({ length: 300 }, (_, index) => index.toString());
469+
let msgIndex = 0;
470+
while (msgIndex < numMessages) {
471+
const key = keys[Math.floor(Math.random() * keys.length)];
472+
await producer.send({
473+
data: Buffer.from(msgIndex.toString()),
474+
partitionKey: key,
475+
});
476+
msgIndex += 1;
477+
}
478+
479+
const assertKeyConsumerIndex = (keyToConsumer, key, expectedIndex) => {
480+
const actualIndex = keyToConsumer.get(key);
481+
expect(actualIndex).toBe(expectedIndex, `Key ${key} assigned to different consumer`);
482+
};
483+
484+
// Verify message distribution
485+
const messagesPerConsumer = Array(numConsumers).fill(0);
486+
const keyToConsumer = new Map();
487+
let messagesReceived = 0;
488+
// eslint-disable-next-line no-restricted-syntax
489+
for (const [index, consumer] of consumers.entries()) {
490+
let msg;
491+
// eslint-disable-next-line no-constant-condition
492+
while (true) {
493+
try {
494+
msg = await consumer.receive(2000);
495+
} catch (err) {
496+
if (err.message.includes('TimeOut')) {
497+
break;
498+
} else {
499+
console.error('Receive error:', err);
500+
}
501+
}
502+
const key = msg.getPartitionKey() || msg.getOrderingKey();
503+
messagesPerConsumer[index] += 1;
504+
messagesReceived += 1;
505+
if (keyToConsumer.has(key)) {
506+
assertKeyConsumerIndex(keyToConsumer, key, index);
507+
} else {
508+
keyToConsumer.set(key, index);
509+
}
510+
await consumer.acknowledge(msg);
511+
}
512+
}
513+
expect(messagesReceived).toBe(numMessages);
514+
515+
// Verify even distribution across consumers
516+
console.log('Messages per consumer:', messagesPerConsumer);
517+
// Consumer 0 are expected to receive a message count < 100
518+
expect(messagesPerConsumer[0]).toBeLessThan(100);
519+
// Consumer 1 and 2 are expected to receive a message count > 400
520+
expect(messagesPerConsumer[1]).toBeGreaterThan(400);
521+
expect(messagesPerConsumer[2]).toBeGreaterThan(400);
522+
}, 20000);
432523
});
433524
});
434525
})();

0 commit comments

Comments
 (0)