Skip to content

Commit 42ccd43

Browse files
committed
node-api: faster threadsafe_function
Invoke threadsafe_function during the same tick and avoid marshalling costs between threads and/or churning event loop if either: 1. There's a queued call already 2. `Push()` is called while the main thread was running threadsafe_function
1 parent 9f5977a commit 42ccd43

File tree

2 files changed

+52
-10
lines changed

2 files changed

+52
-10
lines changed

src/node_api.cc

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "tracing/traced_value.h"
1313
#include "util-inl.h"
1414

15+
#include <atomic>
1516
#include <memory>
1617

1718
struct node_napi_env__ : public napi_env__ {
@@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
137138
*v8::String::Utf8Value(env_->isolate, name)),
138139
thread_count(thread_count_),
139140
is_closing(false),
141+
dispatch_state(kDispatchIdle),
140142
context(context_),
141143
max_queue_size(max_queue_size_),
142144
env(env_),
@@ -176,7 +178,7 @@ class ThreadSafeFunction : public node::AsyncResource {
176178
return napi_closing;
177179
}
178180
} else {
179-
if (uv_async_send(&async) != 0) {
181+
if (Send() != 0) {
180182
return napi_generic_failure;
181183
}
182184
queue.push(data);
@@ -211,7 +213,7 @@ class ThreadSafeFunction : public node::AsyncResource {
211213
if (is_closing && max_queue_size > 0) {
212214
cond->Signal(lock);
213215
}
214-
if (uv_async_send(&async) != 0) {
216+
if (Send() != 0) {
215217
return napi_generic_failure;
216218
}
217219
}
@@ -275,9 +277,32 @@ class ThreadSafeFunction : public node::AsyncResource {
275277
return napi_ok;
276278
}
277279

278-
void DispatchOne() {
280+
inline void* Context() {
281+
return context;
282+
}
283+
284+
protected:
285+
void Dispatch() {
286+
bool has_more = true;
287+
288+
// Limit maximum synchronous iteration count to prevent event loop
289+
// starvation. See `src/node_messaging.cc` for an inspiration.
290+
unsigned int iterations_left = kMaxIterationCount;
291+
while (has_more && --iterations_left != 0) {
292+
dispatch_state = kDispatchRunning;
293+
has_more = DispatchOne();
294+
295+
// Send() was called while we were executing the JS function
296+
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
297+
has_more = true;
298+
}
299+
}
300+
}
301+
302+
bool DispatchOne() {
279303
void* data = nullptr;
280304
bool popped_value = false;
305+
bool has_more = false;
281306

282307
{
283308
node::Mutex::ScopedLock lock(this->mutex);
@@ -305,6 +330,8 @@ class ThreadSafeFunction : public node::AsyncResource {
305330
} else {
306331
CHECK_EQ(0, uv_idle_stop(&idle));
307332
}
333+
} else {
334+
has_more = true;
308335
}
309336
}
310337
}
@@ -322,6 +349,8 @@ class ThreadSafeFunction : public node::AsyncResource {
322349
call_js_cb(env, js_callback, context, data);
323350
});
324351
}
352+
353+
return has_more;
325354
}
326355

327356
void Finalize() {
@@ -335,10 +364,6 @@ class ThreadSafeFunction : public node::AsyncResource {
335364
EmptyQueueAndDelete();
336365
}
337366

338-
inline void* Context() {
339-
return context;
340-
}
341-
342367
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
343368
v8::HandleScope scope(env->isolate);
344369
if (set_closing) {
@@ -370,6 +395,16 @@ class ThreadSafeFunction : public node::AsyncResource {
370395
});
371396
}
372397

398+
int Send() {
399+
// Ask currently running Dispatch() to make one more iteration
400+
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
401+
if ((current_state & kDispatchRunning) == kDispatchRunning) {
402+
return 0;
403+
}
404+
405+
return uv_async_send(&async);
406+
}
407+
373408
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
374409
// without a call_js_cb_.
375410
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
@@ -396,7 +431,7 @@ class ThreadSafeFunction : public node::AsyncResource {
396431
static void IdleCb(uv_idle_t* idle) {
397432
ThreadSafeFunction* ts_fn =
398433
node::ContainerOf(&ThreadSafeFunction::idle, idle);
399-
ts_fn->DispatchOne();
434+
ts_fn->Dispatch();
400435
}
401436

402437
static void AsyncCb(uv_async_t* async) {
@@ -411,6 +446,12 @@ class ThreadSafeFunction : public node::AsyncResource {
411446
}
412447

413448
private:
449+
static const unsigned char kDispatchIdle = 0;
450+
static const unsigned char kDispatchRunning = 1 << 0;
451+
static const unsigned char kDispatchPending = 1 << 1;
452+
453+
static const unsigned int kMaxIterationCount = 1000;
454+
414455
// These are variables protected by the mutex.
415456
node::Mutex mutex;
416457
std::unique_ptr<node::ConditionVariable> cond;
@@ -419,6 +460,7 @@ class ThreadSafeFunction : public node::AsyncResource {
419460
uv_idle_t idle;
420461
size_t thread_count;
421462
bool is_closing;
463+
std::atomic_uchar dispatch_state;
422464

423465
// These are variables set once, upon creation, and then never again, which
424466
// means we don't need the mutex to read them.

test/node-api/test_threadsafe_function/test.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ function testWithJSMarshaller({
4343
binding[threadStarter](function testCallback(value) {
4444
array.push(value);
4545
if (array.length === quitAfter) {
46-
setImmediate(() => {
46+
process.nextTick(() => {
4747
binding.StopThread(common.mustCall(() => {
4848
resolve(array);
4949
}), !!abort);
@@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
8585
// The default call-into-JS implementation passes no arguments.
8686
assert.strictEqual(arguments.length, 0);
8787
if (callCount === binding.ARRAY_LENGTH) {
88-
setImmediate(() => {
88+
process.nextTick(() => {
8989
binding.StopThread(common.mustCall(() => {
9090
resolve();
9191
}), false);

0 commit comments

Comments
 (0)