Skip to content

Instantly share code, notes, and snippets.

@gabrielschulhof
Created August 29, 2018 13:23
Show Gist options
  • Save gabrielschulhof/e56f74cbe33b4cd4df2ac994814930a1 to your computer and use it in GitHub Desktop.
Save gabrielschulhof/e56f74cbe33b4cd4df2ac994814930a1 to your computer and use it in GitHub Desktop.
patience vs. plain
diff --git a/src/node_api.cc b/src/node_api.cc
index 7b6e43d0ce..1dd31107dc 100644
--- a/src/node_api.cc
+++ b/src/node_api.cc
@@ -827,6 +827,336 @@ napi_status ConcludeDeferred(napi_env env,
return GET_RETURN_STATUS(env);
}
+class ThreadSafeFunction : public node::AsyncResource {
+ public:
+ ThreadSafeFunction(v8::Local<v8::Function> func,
+ v8::Local<v8::Object> resource,
+ v8::Local<v8::String> name,
+ size_t thread_count_,
+ void* context_,
+ size_t max_queue_size_,
+ napi_env env_,
+ void* finalize_data_,
+ napi_finalize finalize_cb_,
+ napi_threadsafe_function_call_js call_js_cb_):
+ AsyncResource(env_->isolate,
+ resource,
+ *v8::String::Utf8Value(env_->isolate, name)),
+ thread_count(thread_count_),
+ is_closing(false),
+ context(context_),
+ max_queue_size(max_queue_size_),
+ env(env_),
+ finalize_data(finalize_data_),
+ finalize_cb(finalize_cb_),
+ call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
+ handles_closing(false) {
+ ref.Reset(env->isolate, func);
+ node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
+ }
+
+ ~ThreadSafeFunction() {
+ node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
+ }
+
+ // These methods can be called from any thread.
+
+ napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ while (queue.size() >= max_queue_size &&
+ max_queue_size > 0 &&
+ !is_closing) {
+ if (mode == napi_tsfn_nonblocking) {
+ return napi_queue_full;
+ }
+ cond->Wait(lock);
+ }
+
+ if (is_closing) {
+ if (thread_count == 0) {
+ return napi_invalid_arg;
+ } else {
+ thread_count--;
+ return napi_closing;
+ }
+ } else {
+ if (uv_async_send(&async) != 0) {
+ return napi_generic_failure;
+ }
+ queue.push(data);
+ return napi_ok;
+ }
+ }
+
+ napi_status Acquire() {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ if (is_closing) {
+ return napi_closing;
+ }
+
+ thread_count++;
+
+ return napi_ok;
+ }
+
+ napi_status Release(napi_threadsafe_function_release_mode mode) {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ if (thread_count == 0) {
+ return napi_invalid_arg;
+ }
+
+ thread_count--;
+
+ if (thread_count == 0 || mode == napi_tsfn_abort) {
+ if (!is_closing) {
+ is_closing = (mode == napi_tsfn_abort);
+ if (is_closing && max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ if (uv_async_send(&async) != 0) {
+ return napi_generic_failure;
+ }
+ }
+ }
+
+ return napi_ok;
+ }
+
+ void EmptyQueueAndDelete() {
+ for (; !queue.empty() ; queue.pop()) {
+ call_js_cb(nullptr, nullptr, context, queue.front());
+ }
+ delete this;
+ }
+
+ // These methods must only be called from the loop thread.
+
+ napi_status Init() {
+ ThreadSafeFunction* ts_fn = this;
+
+ if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
+ if (max_queue_size > 0) {
+ cond.reset(new node::ConditionVariable);
+ }
+ if ((max_queue_size == 0 || cond.get() != nullptr) &&
+ uv_idle_init(env->loop, &idle) == 0) {
+ return napi_ok;
+ }
+
+ node::Environment::GetCurrent(env->isolate)->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&async),
+ [](uv_handle_t* handle) -> void {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::async,
+ reinterpret_cast<uv_async_t*>(handle));
+ delete ts_fn;
+ });
+
+ // Prevent the thread-safe function from being deleted here, because
+ // the callback above will delete it.
+ ts_fn = nullptr;
+ }
+
+ delete ts_fn;
+
+ return napi_generic_failure;
+ }
+
+ napi_status Unref() {
+ uv_unref(reinterpret_cast<uv_handle_t*>(&async));
+ uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
+
+ return napi_ok;
+ }
+
+ napi_status Ref() {
+ uv_ref(reinterpret_cast<uv_handle_t*>(&async));
+ uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
+
+ return napi_ok;
+ }
+
+ void DispatchOne() {
+ void* data = nullptr;
+ bool popped_value = false;
+ bool idle_stop_failed = false;
+
+ {
+ node::Mutex::ScopedLock lock(this->mutex);
+ if (is_closing) {
+ CloseHandlesAndMaybeDelete();
+ } else {
+ size_t size = queue.size();
+ if (size > 0) {
+ data = queue.front();
+ queue.pop();
+ popped_value = true;
+ if (size == max_queue_size && max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ size--;
+ }
+
+ if (size == 0) {
+ if (thread_count == 0) {
+ is_closing = true;
+ if (max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ CloseHandlesAndMaybeDelete();
+ } else {
+ if (uv_idle_stop(&idle) != 0) {
+ idle_stop_failed = true;
+ }
+ }
+ }
+ }
+ }
+
+ if (popped_value || idle_stop_failed) {
+ v8::HandleScope scope(env->isolate);
+ CallbackScope cb_scope(this);
+
+ if (idle_stop_failed) {
+ CHECK(napi_throw_error(env,
+ "ERR_NAPI_TSFN_STOP_IDLE_LOOP",
+ "Failed to stop the idle loop") == napi_ok);
+ } else {
+ v8::Local<v8::Function> js_cb =
+ v8::Local<v8::Function>::New(env->isolate, ref);
+ call_js_cb(env,
+ v8impl::JsValueFromV8LocalValue(js_cb),
+ context,
+ data);
+ }
+ }
+ }
+
+ node::Environment* NodeEnv() {
+ // For some reason grabbing the Node.js environment requires a handle scope.
+ v8::HandleScope scope(env->isolate);
+ return node::Environment::GetCurrent(env->isolate);
+ }
+
+ void MaybeStartIdle() {
+ if (uv_idle_start(&idle, IdleCb) != 0) {
+ v8::HandleScope scope(env->isolate);
+ CallbackScope cb_scope(this);
+ CHECK(napi_throw_error(env,
+ "ERR_NAPI_TSFN_START_IDLE_LOOP",
+ "Failed to start the idle loop") == napi_ok);
+ }
+ }
+
+ void Finalize() {
+ v8::HandleScope scope(env->isolate);
+ if (finalize_cb) {
+ CallbackScope cb_scope(this);
+ finalize_cb(env, finalize_data, context);
+ }
+ EmptyQueueAndDelete();
+ }
+
+ inline void* Context() {
+ return context;
+ }
+
+ void CloseHandlesAndMaybeDelete(bool set_closing = false) {
+ if (set_closing) {
+ node::Mutex::ScopedLock lock(this->mutex);
+ is_closing = true;
+ if (max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ }
+ if (handles_closing) {
+ return;
+ }
+ handles_closing = true;
+ NodeEnv()->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&async),
+ [](uv_handle_t* handle) -> void {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::async,
+ reinterpret_cast<uv_async_t*>(handle));
+ ts_fn->NodeEnv()->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
+ [](uv_handle_t* handle) -> void {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::idle,
+ reinterpret_cast<uv_idle_t*>(handle));
+ ts_fn->Finalize();
+ });
+ });
+ }
+
+ // Default way of calling into JavaScript. Used when ThreadSafeFunction is
+ // without a call_js_cb_.
+ static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
+ if (!(env == nullptr || cb == nullptr)) {
+ napi_value recv;
+ napi_status status;
+
+ status = napi_get_undefined(env, &recv);
+ if (status != napi_ok) {
+ napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
+ "Failed to retrieve undefined value");
+ return;
+ }
+
+ status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
+ if (status != napi_ok && status != napi_pending_exception) {
+ napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
+ "Failed to call JS callback");
+ return;
+ }
+ }
+ }
+
+ static void IdleCb(uv_idle_t* idle) {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::idle, idle);
+ ts_fn->DispatchOne();
+ }
+
+ static void AsyncCb(uv_async_t* async) {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::async, async);
+ ts_fn->MaybeStartIdle();
+ }
+
+ static void Cleanup(void* data) {
+ reinterpret_cast<ThreadSafeFunction*>(data)
+ ->CloseHandlesAndMaybeDelete(true);
+ }
+
+ private:
+ // These are variables protected by the mutex.
+ node::Mutex mutex;
+ std::unique_ptr<node::ConditionVariable> cond;
+ std::queue<void*> queue;
+ uv_async_t async;
+ uv_idle_t idle;
+ size_t thread_count;
+ bool is_closing;
+
+ // These are variables set once, upon creation, and then never again, which
+ // means we don't need the mutex to read them.
+ void* context;
+ size_t max_queue_size;
+
+ // These are variables accessed only from the loop thread.
+ node::Persistent<v8::Function> ref;
+ napi_env env;
+ void* finalize_data;
+ napi_finalize finalize_cb;
+ napi_threadsafe_function_call_js call_js_cb;
+ bool handles_closing;
+};
+
} // end of namespace v8impl
// Intercepts the Node-V8 module registration callback. Converts parameters
@@ -3695,334 +4025,7 @@ napi_status napi_run_script(napi_env env,
return GET_RETURN_STATUS(env);
}
-class TsFn: public node::AsyncResource {
- public:
- TsFn(v8::Local<v8::Function> func,
- v8::Local<v8::Object> resource,
- v8::Local<v8::String> name,
- size_t thread_count_,
- void* context_,
- size_t max_queue_size_,
- napi_env env_,
- void* finalize_data_,
- napi_finalize finalize_cb_,
- napi_threadsafe_function_call_js call_js_cb_):
- AsyncResource(env_->isolate,
- resource,
- *v8::String::Utf8Value(env_->isolate, name)),
- thread_count(thread_count_),
- is_closing(false),
- context(context_),
- max_queue_size(max_queue_size_),
- env(env_),
- finalize_data(finalize_data_),
- finalize_cb(finalize_cb_),
- call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
- handles_closing(false) {
- ref.Reset(env->isolate, func);
- node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
- }
-
- ~TsFn() {
- node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
- }
-
- // These methods can be called from any thread.
-
- napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
- node::Mutex::ScopedLock lock(this->mutex);
-
- while (queue.size() >= max_queue_size &&
- max_queue_size > 0 &&
- !is_closing) {
- if (mode == napi_tsfn_nonblocking) {
- return napi_queue_full;
- }
- cond->Wait(lock);
- }
-
- if (is_closing) {
- if (thread_count == 0) {
- return napi_invalid_arg;
- } else {
- thread_count--;
- return napi_closing;
- }
- } else {
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
- queue.push(data);
- return napi_ok;
- }
- }
-
- napi_status Acquire() {
- node::Mutex::ScopedLock lock(this->mutex);
-
- if (is_closing) {
- return napi_closing;
- }
-
- thread_count++;
-
- return napi_ok;
- }
-
- napi_status Release(napi_threadsafe_function_release_mode mode) {
- node::Mutex::ScopedLock lock(this->mutex);
-
- if (thread_count == 0) {
- return napi_invalid_arg;
- }
-
- thread_count--;
-
- if (thread_count == 0 || mode == napi_tsfn_abort) {
- if (!is_closing) {
- is_closing = (mode == napi_tsfn_abort);
- if (is_closing && max_queue_size > 0) {
- cond->Signal(lock);
- }
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
- }
- }
-
- return napi_ok;
- }
-
- void EmptyQueueAndDelete() {
- for (; !queue.empty() ; queue.pop()) {
- call_js_cb(nullptr, nullptr, context, queue.front());
- }
- delete this;
- }
-
- // These methods must only be called from the loop thread.
-
- napi_status Init() {
- TsFn* ts_fn = this;
-
- if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
- if (max_queue_size > 0) {
- cond.reset(new node::ConditionVariable);
- }
- if ((max_queue_size == 0 || cond.get() != nullptr) &&
- uv_idle_init(env->loop, &idle) == 0) {
- return napi_ok;
- }
-
- node::Environment::GetCurrent(env->isolate)->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&async),
- [] (uv_handle_t* handle) -> void {
- TsFn* ts_fn =
- node::ContainerOf(&TsFn::async,
- reinterpret_cast<uv_async_t*>(handle));
- delete ts_fn;
- });
-
- // Prevent the thread-safe function from being deleted here, because
- // the callback above will delete it.
- ts_fn = nullptr;
- }
-
- delete ts_fn;
-
- return napi_generic_failure;
- }
-
- napi_status Unref() {
- uv_unref(reinterpret_cast<uv_handle_t*>(&async));
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
-
- return napi_ok;
- }
-
- napi_status Ref() {
- uv_ref(reinterpret_cast<uv_handle_t*>(&async));
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
-
- return napi_ok;
- }
-
- void DispatchOne() {
- void* data = nullptr;
- bool popped_value = false;
- bool idle_stop_failed = false;
-
- {
- node::Mutex::ScopedLock lock(this->mutex);
- if (is_closing) {
- CloseHandlesAndMaybeDelete();
- } else {
- size_t size = queue.size();
- if (size > 0) {
- data = queue.front();
- queue.pop();
- popped_value = true;
- if (size == max_queue_size && max_queue_size > 0) {
- cond->Signal(lock);
- }
- size--;
- }
-
- if (size == 0) {
- if (thread_count == 0) {
- is_closing = true;
- if (max_queue_size > 0) {
- cond->Signal(lock);
- }
- CloseHandlesAndMaybeDelete();
- } else {
- if (uv_idle_stop(&idle) != 0) {
- idle_stop_failed = true;
- }
- }
- }
- }
- }
-
- if (popped_value || idle_stop_failed) {
- v8::HandleScope scope(env->isolate);
- CallbackScope cb_scope(this);
-
- if (idle_stop_failed) {
- CHECK(napi_throw_error(env,
- "ERR_NAPI_TSFN_STOP_IDLE_LOOP",
- "Failed to stop the idle loop") == napi_ok);
- } else {
- v8::Local<v8::Function> js_cb =
- v8::Local<v8::Function>::New(env->isolate, ref);
- call_js_cb(env,
- v8impl::JsValueFromV8LocalValue(js_cb),
- context,
- data);
- }
- }
- }
-
- node::Environment* NodeEnv() {
- // For some reason grabbing the Node.js environment requires a handle scope.
- v8::HandleScope scope(env->isolate);
- return node::Environment::GetCurrent(env->isolate);
- }
-
- void MaybeStartIdle() {
- if (uv_idle_start(&idle, IdleCb) != 0) {
- v8::HandleScope scope(env->isolate);
- CallbackScope cb_scope(this);
- CHECK(napi_throw_error(env,
- "ERR_NAPI_TSFN_START_IDLE_LOOP",
- "Failed to start the idle loop") == napi_ok);
- }
- }
-
- void Finalize() {
- v8::HandleScope scope(env->isolate);
- if (finalize_cb) {
- CallbackScope cb_scope(this);
- finalize_cb(env, finalize_data, context);
- }
- EmptyQueueAndDelete();
- }
-
- inline void* Context() {
- return context;
- }
-
- void CloseHandlesAndMaybeDelete(bool set_closing = false) {
- if (set_closing) {
- node::Mutex::ScopedLock lock(this->mutex);
- is_closing = true;
- if (max_queue_size > 0) {
- cond->Signal(lock);
- }
- }
- if (handles_closing) {
- return;
- }
- handles_closing = true;
- NodeEnv()->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&async),
- [] (uv_handle_t* handle) -> void {
- TsFn* ts_fn = node::ContainerOf(&TsFn::async,
- reinterpret_cast<uv_async_t*>(handle));
- ts_fn->NodeEnv()->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
- [] (uv_handle_t* handle) -> void {
- TsFn* ts_fn = node::ContainerOf(&TsFn::idle,
- reinterpret_cast<uv_idle_t*>(handle));
- ts_fn->Finalize();
- });
- });
- }
-
- // Default way of calling into JavaScript. Used when TsFn is constructed
- // without a call_js_cb_.
- static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
- if (!(env == nullptr || cb == nullptr)) {
- napi_value recv;
- napi_status status;
-
- status = napi_get_undefined(env, &recv);
- if (status != napi_ok) {
- napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
- "Failed to retrieve undefined value");
- return;
- }
-
- status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
- if (status != napi_ok && status != napi_pending_exception) {
- napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
- "Failed to call JS callback");
- return;
- }
- }
- }
-
- static void IdleCb(uv_idle_t* idle) {
- TsFn* ts_fn =
- node::ContainerOf(&TsFn::idle, idle);
- ts_fn->DispatchOne();
- }
-
- static void AsyncCb(uv_async_t* async) {
- TsFn* ts_fn =
- node::ContainerOf(&TsFn::async, async);
- ts_fn->MaybeStartIdle();
- }
-
- static void Cleanup(void* data) {
- reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true);
- }
-
- private:
- // These are variables protected by the mutex.
- node::Mutex mutex;
- std::unique_ptr<node::ConditionVariable> cond;
- std::queue<void*> queue;
- uv_async_t async;
- uv_idle_t idle;
- size_t thread_count;
- bool is_closing;
-
- // These are variables set once, upon creation, and then never again, which
- // means we don't need the mutex to read them.
- void* context;
- size_t max_queue_size;
-
- // These are variables accessed only from the loop thread.
- node::Persistent<v8::Function> ref;
- napi_env env;
- void* finalize_data;
- napi_finalize finalize_cb;
- napi_threadsafe_function_call_js call_js_cb;
- bool handles_closing;
-};
-
-NAPI_EXTERN napi_status
+napi_status
napi_create_threadsafe_function(napi_env env,
napi_value func,
napi_value async_resource,
@@ -4057,16 +4060,17 @@ napi_create_threadsafe_function(napi_env env,
v8::Local<v8::String> v8_name;
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
- TsFn* ts_fn = new TsFn(v8_func,
- v8_resource,
- v8_name,
- initial_thread_count,
- context,
- max_queue_size,
- env,
- thread_finalize_data,
- thread_finalize_cb,
- call_js_cb);
+ v8impl::ThreadSafeFunction* ts_fn =
+ new v8impl::ThreadSafeFunction(v8_func,
+ v8_resource,
+ v8_name,
+ initial_thread_count,
+ context,
+ max_queue_size,
+ env,
+ thread_finalize_data,
+ thread_finalize_cb,
+ call_js_cb);
if (ts_fn == nullptr) {
status = napi_generic_failure;
@@ -4081,45 +4085,46 @@ napi_create_threadsafe_function(napi_env env,
return napi_set_last_error(env, status);
}
-NAPI_EXTERN napi_status
+napi_status
napi_get_threadsafe_function_context(napi_threadsafe_function func,
void** result) {
CHECK(func != nullptr);
CHECK(result != nullptr);
- *result = reinterpret_cast<TsFn*>(func)->Context();
+ *result = reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Context();
return napi_ok;
}
-NAPI_EXTERN napi_status
+napi_status
napi_call_threadsafe_function(napi_threadsafe_function func,
void* data,
napi_threadsafe_function_call_mode is_blocking) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking);
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Push(data,
+ is_blocking);
}
-NAPI_EXTERN napi_status
+napi_status
napi_acquire_threadsafe_function(napi_threadsafe_function func) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Acquire();
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Acquire();
}
-NAPI_EXTERN napi_status
+napi_status
napi_release_threadsafe_function(napi_threadsafe_function func,
napi_threadsafe_function_release_mode mode) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Release(mode);
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Release(mode);
}
-NAPI_EXTERN napi_status
+napi_status
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Unref();
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Unref();
}
-NAPI_EXTERN napi_status
+napi_status
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Ref();
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Ref();
}
diff --git a/src/node_api.cc b/src/node_api.cc
index 7b6e43d0ce..1dd31107dc 100644
--- a/src/node_api.cc
+++ b/src/node_api.cc
@@ -827,6 +827,336 @@ napi_status ConcludeDeferred(napi_env env,
return GET_RETURN_STATUS(env);
}
+class ThreadSafeFunction : public node::AsyncResource {
+ public:
+ ThreadSafeFunction(v8::Local<v8::Function> func,
+ v8::Local<v8::Object> resource,
+ v8::Local<v8::String> name,
+ size_t thread_count_,
+ void* context_,
+ size_t max_queue_size_,
+ napi_env env_,
+ void* finalize_data_,
+ napi_finalize finalize_cb_,
+ napi_threadsafe_function_call_js call_js_cb_):
+ AsyncResource(env_->isolate,
+ resource,
+ *v8::String::Utf8Value(env_->isolate, name)),
+ thread_count(thread_count_),
+ is_closing(false),
+ context(context_),
+ max_queue_size(max_queue_size_),
+ env(env_),
+ finalize_data(finalize_data_),
+ finalize_cb(finalize_cb_),
+ call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
+ handles_closing(false) {
+ ref.Reset(env->isolate, func);
+ node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
+ }
+
+ ~ThreadSafeFunction() {
+ node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
+ }
+
+ // These methods can be called from any thread.
+
+ napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ while (queue.size() >= max_queue_size &&
+ max_queue_size > 0 &&
+ !is_closing) {
+ if (mode == napi_tsfn_nonblocking) {
+ return napi_queue_full;
+ }
+ cond->Wait(lock);
+ }
+
+ if (is_closing) {
+ if (thread_count == 0) {
+ return napi_invalid_arg;
+ } else {
+ thread_count--;
+ return napi_closing;
+ }
+ } else {
+ if (uv_async_send(&async) != 0) {
+ return napi_generic_failure;
+ }
+ queue.push(data);
+ return napi_ok;
+ }
+ }
+
+ napi_status Acquire() {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ if (is_closing) {
+ return napi_closing;
+ }
+
+ thread_count++;
+
+ return napi_ok;
+ }
+
+ napi_status Release(napi_threadsafe_function_release_mode mode) {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ if (thread_count == 0) {
+ return napi_invalid_arg;
+ }
+
+ thread_count--;
+
+ if (thread_count == 0 || mode == napi_tsfn_abort) {
+ if (!is_closing) {
+ is_closing = (mode == napi_tsfn_abort);
+ if (is_closing && max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ if (uv_async_send(&async) != 0) {
+ return napi_generic_failure;
+ }
+ }
+ }
+
+ return napi_ok;
+ }
+
+ void EmptyQueueAndDelete() {
+ for (; !queue.empty() ; queue.pop()) {
+ call_js_cb(nullptr, nullptr, context, queue.front());
+ }
+ delete this;
+ }
+
+ // These methods must only be called from the loop thread.
+
+ napi_status Init() {
+ ThreadSafeFunction* ts_fn = this;
+
+ if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
+ if (max_queue_size > 0) {
+ cond.reset(new node::ConditionVariable);
+ }
+ if ((max_queue_size == 0 || cond.get() != nullptr) &&
+ uv_idle_init(env->loop, &idle) == 0) {
+ return napi_ok;
+ }
+
+ node::Environment::GetCurrent(env->isolate)->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&async),
+ [](uv_handle_t* handle) -> void {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::async,
+ reinterpret_cast<uv_async_t*>(handle));
+ delete ts_fn;
+ });
+
+ // Prevent the thread-safe function from being deleted here, because
+ // the callback above will delete it.
+ ts_fn = nullptr;
+ }
+
+ delete ts_fn;
+
+ return napi_generic_failure;
+ }
+
+ napi_status Unref() {
+ uv_unref(reinterpret_cast<uv_handle_t*>(&async));
+ uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
+
+ return napi_ok;
+ }
+
+ napi_status Ref() {
+ uv_ref(reinterpret_cast<uv_handle_t*>(&async));
+ uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
+
+ return napi_ok;
+ }
+
+ void DispatchOne() {
+ void* data = nullptr;
+ bool popped_value = false;
+ bool idle_stop_failed = false;
+
+ {
+ node::Mutex::ScopedLock lock(this->mutex);
+ if (is_closing) {
+ CloseHandlesAndMaybeDelete();
+ } else {
+ size_t size = queue.size();
+ if (size > 0) {
+ data = queue.front();
+ queue.pop();
+ popped_value = true;
+ if (size == max_queue_size && max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ size--;
+ }
+
+ if (size == 0) {
+ if (thread_count == 0) {
+ is_closing = true;
+ if (max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ CloseHandlesAndMaybeDelete();
+ } else {
+ if (uv_idle_stop(&idle) != 0) {
+ idle_stop_failed = true;
+ }
+ }
+ }
+ }
+ }
+
+ if (popped_value || idle_stop_failed) {
+ v8::HandleScope scope(env->isolate);
+ CallbackScope cb_scope(this);
+
+ if (idle_stop_failed) {
+ CHECK(napi_throw_error(env,
+ "ERR_NAPI_TSFN_STOP_IDLE_LOOP",
+ "Failed to stop the idle loop") == napi_ok);
+ } else {
+ v8::Local<v8::Function> js_cb =
+ v8::Local<v8::Function>::New(env->isolate, ref);
+ call_js_cb(env,
+ v8impl::JsValueFromV8LocalValue(js_cb),
+ context,
+ data);
+ }
+ }
+ }
+
+ node::Environment* NodeEnv() {
+ // For some reason grabbing the Node.js environment requires a handle scope.
+ v8::HandleScope scope(env->isolate);
+ return node::Environment::GetCurrent(env->isolate);
+ }
+
+ void MaybeStartIdle() {
+ if (uv_idle_start(&idle, IdleCb) != 0) {
+ v8::HandleScope scope(env->isolate);
+ CallbackScope cb_scope(this);
+ CHECK(napi_throw_error(env,
+ "ERR_NAPI_TSFN_START_IDLE_LOOP",
+ "Failed to start the idle loop") == napi_ok);
+ }
+ }
+
+ void Finalize() {
+ v8::HandleScope scope(env->isolate);
+ if (finalize_cb) {
+ CallbackScope cb_scope(this);
+ finalize_cb(env, finalize_data, context);
+ }
+ EmptyQueueAndDelete();
+ }
+
+ inline void* Context() {
+ return context;
+ }
+
+ void CloseHandlesAndMaybeDelete(bool set_closing = false) {
+ if (set_closing) {
+ node::Mutex::ScopedLock lock(this->mutex);
+ is_closing = true;
+ if (max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ }
+ if (handles_closing) {
+ return;
+ }
+ handles_closing = true;
+ NodeEnv()->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&async),
+ [](uv_handle_t* handle) -> void {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::async,
+ reinterpret_cast<uv_async_t*>(handle));
+ ts_fn->NodeEnv()->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
+ [](uv_handle_t* handle) -> void {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::idle,
+ reinterpret_cast<uv_idle_t*>(handle));
+ ts_fn->Finalize();
+ });
+ });
+ }
+
+ // Default way of calling into JavaScript. Used when ThreadSafeFunction is
+ // without a call_js_cb_.
+ static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
+ if (!(env == nullptr || cb == nullptr)) {
+ napi_value recv;
+ napi_status status;
+
+ status = napi_get_undefined(env, &recv);
+ if (status != napi_ok) {
+ napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
+ "Failed to retrieve undefined value");
+ return;
+ }
+
+ status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
+ if (status != napi_ok && status != napi_pending_exception) {
+ napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
+ "Failed to call JS callback");
+ return;
+ }
+ }
+ }
+
+ static void IdleCb(uv_idle_t* idle) {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::idle, idle);
+ ts_fn->DispatchOne();
+ }
+
+ static void AsyncCb(uv_async_t* async) {
+ ThreadSafeFunction* ts_fn =
+ node::ContainerOf(&ThreadSafeFunction::async, async);
+ ts_fn->MaybeStartIdle();
+ }
+
+ static void Cleanup(void* data) {
+ reinterpret_cast<ThreadSafeFunction*>(data)
+ ->CloseHandlesAndMaybeDelete(true);
+ }
+
+ private:
+ // These are variables protected by the mutex.
+ node::Mutex mutex;
+ std::unique_ptr<node::ConditionVariable> cond;
+ std::queue<void*> queue;
+ uv_async_t async;
+ uv_idle_t idle;
+ size_t thread_count;
+ bool is_closing;
+
+ // These are variables set once, upon creation, and then never again, which
+ // means we don't need the mutex to read them.
+ void* context;
+ size_t max_queue_size;
+
+ // These are variables accessed only from the loop thread.
+ node::Persistent<v8::Function> ref;
+ napi_env env;
+ void* finalize_data;
+ napi_finalize finalize_cb;
+ napi_threadsafe_function_call_js call_js_cb;
+ bool handles_closing;
+};
+
} // end of namespace v8impl
// Intercepts the Node-V8 module registration callback. Converts parameters
@@ -3581,448 +3911,121 @@ napi_status napi_create_async_work(napi_env env,
return napi_clear_last_error(env);
}
-napi_status napi_delete_async_work(napi_env env, napi_async_work work) {
- CHECK_ENV(env);
- CHECK_ARG(env, work);
-
- uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work));
-
- return napi_clear_last_error(env);
-}
-
-napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) {
- CHECK_ENV(env);
- CHECK_ARG(env, loop);
- *loop = env->loop;
- return napi_clear_last_error(env);
-}
-
-napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
- CHECK_ENV(env);
- CHECK_ARG(env, work);
-
- napi_status status;
- uv_loop_t* event_loop = nullptr;
- status = napi_get_uv_event_loop(env, &event_loop);
- if (status != napi_ok)
- return napi_set_last_error(env, status);
-
- uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
-
- w->ScheduleWork();
-
- return napi_clear_last_error(env);
-}
-
-napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
- CHECK_ENV(env);
- CHECK_ARG(env, work);
-
- uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
-
- CALL_UV(env, w->CancelWork());
-
- return napi_clear_last_error(env);
-}
-
-napi_status napi_create_promise(napi_env env,
- napi_deferred* deferred,
- napi_value* promise) {
- NAPI_PREAMBLE(env);
- CHECK_ARG(env, deferred);
- CHECK_ARG(env, promise);
-
- auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext());
- CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure);
-
- auto v8_resolver = maybe.ToLocalChecked();
- auto v8_deferred = new node::Persistent<v8::Value>();
- v8_deferred->Reset(env->isolate, v8_resolver);
-
- *deferred = v8impl::JsDeferredFromNodePersistent(v8_deferred);
- *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise());
- return GET_RETURN_STATUS(env);
-}
-
-napi_status napi_resolve_deferred(napi_env env,
- napi_deferred deferred,
- napi_value resolution) {
- return v8impl::ConcludeDeferred(env, deferred, resolution, true);
-}
-
-napi_status napi_reject_deferred(napi_env env,
- napi_deferred deferred,
- napi_value resolution) {
- return v8impl::ConcludeDeferred(env, deferred, resolution, false);
-}
-
-napi_status napi_is_promise(napi_env env,
- napi_value promise,
- bool* is_promise) {
- CHECK_ENV(env);
- CHECK_ARG(env, promise);
- CHECK_ARG(env, is_promise);
-
- *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise();
-
- return napi_clear_last_error(env);
-}
-
-napi_status napi_run_script(napi_env env,
- napi_value script,
- napi_value* result) {
- NAPI_PREAMBLE(env);
- CHECK_ARG(env, script);
- CHECK_ARG(env, result);
-
- v8::Local<v8::Value> v8_script = v8impl::V8LocalValueFromJsValue(script);
-
- if (!v8_script->IsString()) {
- return napi_set_last_error(env, napi_string_expected);
- }
-
- v8::Local<v8::Context> context = env->isolate->GetCurrentContext();
-
- auto maybe_script = v8::Script::Compile(context,
- v8::Local<v8::String>::Cast(v8_script));
- CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure);
-
- auto script_result =
- maybe_script.ToLocalChecked()->Run(context);
- CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure);
-
- *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked());
- return GET_RETURN_STATUS(env);
-}
-
-class TsFn: public node::AsyncResource {
- public:
- TsFn(v8::Local<v8::Function> func,
- v8::Local<v8::Object> resource,
- v8::Local<v8::String> name,
- size_t thread_count_,
- void* context_,
- size_t max_queue_size_,
- napi_env env_,
- void* finalize_data_,
- napi_finalize finalize_cb_,
- napi_threadsafe_function_call_js call_js_cb_):
- AsyncResource(env_->isolate,
- resource,
- *v8::String::Utf8Value(env_->isolate, name)),
- thread_count(thread_count_),
- is_closing(false),
- context(context_),
- max_queue_size(max_queue_size_),
- env(env_),
- finalize_data(finalize_data_),
- finalize_cb(finalize_cb_),
- call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
- handles_closing(false) {
- ref.Reset(env->isolate, func);
- node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
- }
-
- ~TsFn() {
- node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
- }
-
- // These methods can be called from any thread.
-
- napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
- node::Mutex::ScopedLock lock(this->mutex);
-
- while (queue.size() >= max_queue_size &&
- max_queue_size > 0 &&
- !is_closing) {
- if (mode == napi_tsfn_nonblocking) {
- return napi_queue_full;
- }
- cond->Wait(lock);
- }
-
- if (is_closing) {
- if (thread_count == 0) {
- return napi_invalid_arg;
- } else {
- thread_count--;
- return napi_closing;
- }
- } else {
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
- queue.push(data);
- return napi_ok;
- }
- }
-
- napi_status Acquire() {
- node::Mutex::ScopedLock lock(this->mutex);
-
- if (is_closing) {
- return napi_closing;
- }
-
- thread_count++;
-
- return napi_ok;
- }
-
- napi_status Release(napi_threadsafe_function_release_mode mode) {
- node::Mutex::ScopedLock lock(this->mutex);
-
- if (thread_count == 0) {
- return napi_invalid_arg;
- }
-
- thread_count--;
-
- if (thread_count == 0 || mode == napi_tsfn_abort) {
- if (!is_closing) {
- is_closing = (mode == napi_tsfn_abort);
- if (is_closing && max_queue_size > 0) {
- cond->Signal(lock);
- }
- if (uv_async_send(&async) != 0) {
- return napi_generic_failure;
- }
- }
- }
-
- return napi_ok;
- }
-
- void EmptyQueueAndDelete() {
- for (; !queue.empty() ; queue.pop()) {
- call_js_cb(nullptr, nullptr, context, queue.front());
- }
- delete this;
- }
-
- // These methods must only be called from the loop thread.
-
- napi_status Init() {
- TsFn* ts_fn = this;
+napi_status napi_delete_async_work(napi_env env, napi_async_work work) {
+ CHECK_ENV(env);
+ CHECK_ARG(env, work);
- if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
- if (max_queue_size > 0) {
- cond.reset(new node::ConditionVariable);
- }
- if ((max_queue_size == 0 || cond.get() != nullptr) &&
- uv_idle_init(env->loop, &idle) == 0) {
- return napi_ok;
- }
+ uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work));
- node::Environment::GetCurrent(env->isolate)->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&async),
- [] (uv_handle_t* handle) -> void {
- TsFn* ts_fn =
- node::ContainerOf(&TsFn::async,
- reinterpret_cast<uv_async_t*>(handle));
- delete ts_fn;
- });
+ return napi_clear_last_error(env);
+}
- // Prevent the thread-safe function from being deleted here, because
- // the callback above will delete it.
- ts_fn = nullptr;
- }
+napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) {
+ CHECK_ENV(env);
+ CHECK_ARG(env, loop);
+ *loop = env->loop;
+ return napi_clear_last_error(env);
+}
- delete ts_fn;
+napi_status napi_queue_async_work(napi_env env, napi_async_work work) {
+ CHECK_ENV(env);
+ CHECK_ARG(env, work);
- return napi_generic_failure;
- }
+ napi_status status;
+ uv_loop_t* event_loop = nullptr;
+ status = napi_get_uv_event_loop(env, &event_loop);
+ if (status != napi_ok)
+ return napi_set_last_error(env, status);
- napi_status Unref() {
- uv_unref(reinterpret_cast<uv_handle_t*>(&async));
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
+ uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
- return napi_ok;
- }
+ w->ScheduleWork();
- napi_status Ref() {
- uv_ref(reinterpret_cast<uv_handle_t*>(&async));
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
+ return napi_clear_last_error(env);
+}
- return napi_ok;
- }
+napi_status napi_cancel_async_work(napi_env env, napi_async_work work) {
+ CHECK_ENV(env);
+ CHECK_ARG(env, work);
- void DispatchOne() {
- void* data = nullptr;
- bool popped_value = false;
- bool idle_stop_failed = false;
+ uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work);
- {
- node::Mutex::ScopedLock lock(this->mutex);
- if (is_closing) {
- CloseHandlesAndMaybeDelete();
- } else {
- size_t size = queue.size();
- if (size > 0) {
- data = queue.front();
- queue.pop();
- popped_value = true;
- if (size == max_queue_size && max_queue_size > 0) {
- cond->Signal(lock);
- }
- size--;
- }
+ CALL_UV(env, w->CancelWork());
- if (size == 0) {
- if (thread_count == 0) {
- is_closing = true;
- if (max_queue_size > 0) {
- cond->Signal(lock);
- }
- CloseHandlesAndMaybeDelete();
- } else {
- if (uv_idle_stop(&idle) != 0) {
- idle_stop_failed = true;
- }
- }
- }
- }
- }
+ return napi_clear_last_error(env);
+}
- if (popped_value || idle_stop_failed) {
- v8::HandleScope scope(env->isolate);
- CallbackScope cb_scope(this);
+napi_status napi_create_promise(napi_env env,
+ napi_deferred* deferred,
+ napi_value* promise) {
+ NAPI_PREAMBLE(env);
+ CHECK_ARG(env, deferred);
+ CHECK_ARG(env, promise);
- if (idle_stop_failed) {
- CHECK(napi_throw_error(env,
- "ERR_NAPI_TSFN_STOP_IDLE_LOOP",
- "Failed to stop the idle loop") == napi_ok);
- } else {
- v8::Local<v8::Function> js_cb =
- v8::Local<v8::Function>::New(env->isolate, ref);
- call_js_cb(env,
- v8impl::JsValueFromV8LocalValue(js_cb),
- context,
- data);
- }
- }
- }
+ auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext());
+ CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure);
- node::Environment* NodeEnv() {
- // For some reason grabbing the Node.js environment requires a handle scope.
- v8::HandleScope scope(env->isolate);
- return node::Environment::GetCurrent(env->isolate);
- }
+ auto v8_resolver = maybe.ToLocalChecked();
+ auto v8_deferred = new node::Persistent<v8::Value>();
+ v8_deferred->Reset(env->isolate, v8_resolver);
- void MaybeStartIdle() {
- if (uv_idle_start(&idle, IdleCb) != 0) {
- v8::HandleScope scope(env->isolate);
- CallbackScope cb_scope(this);
- CHECK(napi_throw_error(env,
- "ERR_NAPI_TSFN_START_IDLE_LOOP",
- "Failed to start the idle loop") == napi_ok);
- }
- }
+ *deferred = v8impl::JsDeferredFromNodePersistent(v8_deferred);
+ *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise());
+ return GET_RETURN_STATUS(env);
+}
- void Finalize() {
- v8::HandleScope scope(env->isolate);
- if (finalize_cb) {
- CallbackScope cb_scope(this);
- finalize_cb(env, finalize_data, context);
- }
- EmptyQueueAndDelete();
- }
+napi_status napi_resolve_deferred(napi_env env,
+ napi_deferred deferred,
+ napi_value resolution) {
+ return v8impl::ConcludeDeferred(env, deferred, resolution, true);
+}
- inline void* Context() {
- return context;
- }
+napi_status napi_reject_deferred(napi_env env,
+ napi_deferred deferred,
+ napi_value resolution) {
+ return v8impl::ConcludeDeferred(env, deferred, resolution, false);
+}
- void CloseHandlesAndMaybeDelete(bool set_closing = false) {
- if (set_closing) {
- node::Mutex::ScopedLock lock(this->mutex);
- is_closing = true;
- if (max_queue_size > 0) {
- cond->Signal(lock);
- }
- }
- if (handles_closing) {
- return;
- }
- handles_closing = true;
- NodeEnv()->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&async),
- [] (uv_handle_t* handle) -> void {
- TsFn* ts_fn = node::ContainerOf(&TsFn::async,
- reinterpret_cast<uv_async_t*>(handle));
- ts_fn->NodeEnv()->CloseHandle(
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
- [] (uv_handle_t* handle) -> void {
- TsFn* ts_fn = node::ContainerOf(&TsFn::idle,
- reinterpret_cast<uv_idle_t*>(handle));
- ts_fn->Finalize();
- });
- });
- }
+napi_status napi_is_promise(napi_env env,
+ napi_value promise,
+ bool* is_promise) {
+ CHECK_ENV(env);
+ CHECK_ARG(env, promise);
+ CHECK_ARG(env, is_promise);
- // Default way of calling into JavaScript. Used when TsFn is constructed
- // without a call_js_cb_.
- static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
- if (!(env == nullptr || cb == nullptr)) {
- napi_value recv;
- napi_status status;
+ *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise();
- status = napi_get_undefined(env, &recv);
- if (status != napi_ok) {
- napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
- "Failed to retrieve undefined value");
- return;
- }
+ return napi_clear_last_error(env);
+}
- status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
- if (status != napi_ok && status != napi_pending_exception) {
- napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
- "Failed to call JS callback");
- return;
- }
- }
- }
+napi_status napi_run_script(napi_env env,
+ napi_value script,
+ napi_value* result) {
+ NAPI_PREAMBLE(env);
+ CHECK_ARG(env, script);
+ CHECK_ARG(env, result);
- static void IdleCb(uv_idle_t* idle) {
- TsFn* ts_fn =
- node::ContainerOf(&TsFn::idle, idle);
- ts_fn->DispatchOne();
- }
+ v8::Local<v8::Value> v8_script = v8impl::V8LocalValueFromJsValue(script);
- static void AsyncCb(uv_async_t* async) {
- TsFn* ts_fn =
- node::ContainerOf(&TsFn::async, async);
- ts_fn->MaybeStartIdle();
+ if (!v8_script->IsString()) {
+ return napi_set_last_error(env, napi_string_expected);
}
- static void Cleanup(void* data) {
- reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true);
- }
+ v8::Local<v8::Context> context = env->isolate->GetCurrentContext();
- private:
- // These are variables protected by the mutex.
- node::Mutex mutex;
- std::unique_ptr<node::ConditionVariable> cond;
- std::queue<void*> queue;
- uv_async_t async;
- uv_idle_t idle;
- size_t thread_count;
- bool is_closing;
+ auto maybe_script = v8::Script::Compile(context,
+ v8::Local<v8::String>::Cast(v8_script));
+ CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure);
- // These are variables set once, upon creation, and then never again, which
- // means we don't need the mutex to read them.
- void* context;
- size_t max_queue_size;
+ auto script_result =
+ maybe_script.ToLocalChecked()->Run(context);
+ CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure);
- // These are variables accessed only from the loop thread.
- node::Persistent<v8::Function> ref;
- napi_env env;
- void* finalize_data;
- napi_finalize finalize_cb;
- napi_threadsafe_function_call_js call_js_cb;
- bool handles_closing;
-};
+ *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked());
+ return GET_RETURN_STATUS(env);
+}
-NAPI_EXTERN napi_status
+napi_status
napi_create_threadsafe_function(napi_env env,
napi_value func,
napi_value async_resource,
@@ -4057,16 +4060,17 @@ napi_create_threadsafe_function(napi_env env,
v8::Local<v8::String> v8_name;
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
- TsFn* ts_fn = new TsFn(v8_func,
- v8_resource,
- v8_name,
- initial_thread_count,
- context,
- max_queue_size,
- env,
- thread_finalize_data,
- thread_finalize_cb,
- call_js_cb);
+ v8impl::ThreadSafeFunction* ts_fn =
+ new v8impl::ThreadSafeFunction(v8_func,
+ v8_resource,
+ v8_name,
+ initial_thread_count,
+ context,
+ max_queue_size,
+ env,
+ thread_finalize_data,
+ thread_finalize_cb,
+ call_js_cb);
if (ts_fn == nullptr) {
status = napi_generic_failure;
@@ -4081,45 +4085,46 @@ napi_create_threadsafe_function(napi_env env,
return napi_set_last_error(env, status);
}
-NAPI_EXTERN napi_status
+napi_status
napi_get_threadsafe_function_context(napi_threadsafe_function func,
void** result) {
CHECK(func != nullptr);
CHECK(result != nullptr);
- *result = reinterpret_cast<TsFn*>(func)->Context();
+ *result = reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Context();
return napi_ok;
}
-NAPI_EXTERN napi_status
+napi_status
napi_call_threadsafe_function(napi_threadsafe_function func,
void* data,
napi_threadsafe_function_call_mode is_blocking) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking);
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Push(data,
+ is_blocking);
}
-NAPI_EXTERN napi_status
+napi_status
napi_acquire_threadsafe_function(napi_threadsafe_function func) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Acquire();
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Acquire();
}
-NAPI_EXTERN napi_status
+napi_status
napi_release_threadsafe_function(napi_threadsafe_function func,
napi_threadsafe_function_release_mode mode) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Release(mode);
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Release(mode);
}
-NAPI_EXTERN napi_status
+napi_status
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Unref();
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Unref();
}
-NAPI_EXTERN napi_status
+napi_status
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
CHECK(func != nullptr);
- return reinterpret_cast<TsFn*>(func)->Ref();
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Ref();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment