Created
August 29, 2018 13:23
-
-
Save gabrielschulhof/e56f74cbe33b4cd4df2ac994814930a1 to your computer and use it in GitHub Desktop.
patience vs. plain
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/node_api.cc b/src/node_api.cc | |
index 7b6e43d0ce..1dd31107dc 100644 | |
--- a/src/node_api.cc | |
+++ b/src/node_api.cc | |
@@ -827,6 +827,336 @@ napi_status ConcludeDeferred(napi_env env, | |
return GET_RETURN_STATUS(env); | |
} | |
+class ThreadSafeFunction : public node::AsyncResource { | |
+ public: | |
+ ThreadSafeFunction(v8::Local<v8::Function> func, | |
+ v8::Local<v8::Object> resource, | |
+ v8::Local<v8::String> name, | |
+ size_t thread_count_, | |
+ void* context_, | |
+ size_t max_queue_size_, | |
+ napi_env env_, | |
+ void* finalize_data_, | |
+ napi_finalize finalize_cb_, | |
+ napi_threadsafe_function_call_js call_js_cb_): | |
+ AsyncResource(env_->isolate, | |
+ resource, | |
+ *v8::String::Utf8Value(env_->isolate, name)), | |
+ thread_count(thread_count_), | |
+ is_closing(false), | |
+ context(context_), | |
+ max_queue_size(max_queue_size_), | |
+ env(env_), | |
+ finalize_data(finalize_data_), | |
+ finalize_cb(finalize_cb_), | |
+ call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), | |
+ handles_closing(false) { | |
+ ref.Reset(env->isolate, func); | |
+ node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
+ } | |
+ | |
+ ~ThreadSafeFunction() { | |
+ node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
+ } | |
+ | |
+ // These methods can be called from any thread. | |
+ | |
+ napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ | |
+ while (queue.size() >= max_queue_size && | |
+ max_queue_size > 0 && | |
+ !is_closing) { | |
+ if (mode == napi_tsfn_nonblocking) { | |
+ return napi_queue_full; | |
+ } | |
+ cond->Wait(lock); | |
+ } | |
+ | |
+ if (is_closing) { | |
+ if (thread_count == 0) { | |
+ return napi_invalid_arg; | |
+ } else { | |
+ thread_count--; | |
+ return napi_closing; | |
+ } | |
+ } else { | |
+ if (uv_async_send(&async) != 0) { | |
+ return napi_generic_failure; | |
+ } | |
+ queue.push(data); | |
+ return napi_ok; | |
+ } | |
+ } | |
+ | |
+ napi_status Acquire() { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ | |
+ if (is_closing) { | |
+ return napi_closing; | |
+ } | |
+ | |
+ thread_count++; | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ napi_status Release(napi_threadsafe_function_release_mode mode) { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ | |
+ if (thread_count == 0) { | |
+ return napi_invalid_arg; | |
+ } | |
+ | |
+ thread_count--; | |
+ | |
+ if (thread_count == 0 || mode == napi_tsfn_abort) { | |
+ if (!is_closing) { | |
+ is_closing = (mode == napi_tsfn_abort); | |
+ if (is_closing && max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ if (uv_async_send(&async) != 0) { | |
+ return napi_generic_failure; | |
+ } | |
+ } | |
+ } | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ void EmptyQueueAndDelete() { | |
+ for (; !queue.empty() ; queue.pop()) { | |
+ call_js_cb(nullptr, nullptr, context, queue.front()); | |
+ } | |
+ delete this; | |
+ } | |
+ | |
+ // These methods must only be called from the loop thread. | |
+ | |
+ napi_status Init() { | |
+ ThreadSafeFunction* ts_fn = this; | |
+ | |
+ if (uv_async_init(env->loop, &async, AsyncCb) == 0) { | |
+ if (max_queue_size > 0) { | |
+ cond.reset(new node::ConditionVariable); | |
+ } | |
+ if ((max_queue_size == 0 || cond.get() != nullptr) && | |
+ uv_idle_init(env->loop, &idle) == 0) { | |
+ return napi_ok; | |
+ } | |
+ | |
+ node::Environment::GetCurrent(env->isolate)->CloseHandle( | |
+ reinterpret_cast<uv_handle_t*>(&async), | |
+ [](uv_handle_t* handle) -> void { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::async, | |
+ reinterpret_cast<uv_async_t*>(handle)); | |
+ delete ts_fn; | |
+ }); | |
+ | |
+ // Prevent the thread-safe function from being deleted here, because | |
+ // the callback above will delete it. | |
+ ts_fn = nullptr; | |
+ } | |
+ | |
+ delete ts_fn; | |
+ | |
+ return napi_generic_failure; | |
+ } | |
+ | |
+ napi_status Unref() { | |
+ uv_unref(reinterpret_cast<uv_handle_t*>(&async)); | |
+ uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ napi_status Ref() { | |
+ uv_ref(reinterpret_cast<uv_handle_t*>(&async)); | |
+ uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ void DispatchOne() { | |
+ void* data = nullptr; | |
+ bool popped_value = false; | |
+ bool idle_stop_failed = false; | |
+ | |
+ { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ if (is_closing) { | |
+ CloseHandlesAndMaybeDelete(); | |
+ } else { | |
+ size_t size = queue.size(); | |
+ if (size > 0) { | |
+ data = queue.front(); | |
+ queue.pop(); | |
+ popped_value = true; | |
+ if (size == max_queue_size && max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ size--; | |
+ } | |
+ | |
+ if (size == 0) { | |
+ if (thread_count == 0) { | |
+ is_closing = true; | |
+ if (max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ CloseHandlesAndMaybeDelete(); | |
+ } else { | |
+ if (uv_idle_stop(&idle) != 0) { | |
+ idle_stop_failed = true; | |
+ } | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+ if (popped_value || idle_stop_failed) { | |
+ v8::HandleScope scope(env->isolate); | |
+ CallbackScope cb_scope(this); | |
+ | |
+ if (idle_stop_failed) { | |
+ CHECK(napi_throw_error(env, | |
+ "ERR_NAPI_TSFN_STOP_IDLE_LOOP", | |
+ "Failed to stop the idle loop") == napi_ok); | |
+ } else { | |
+ v8::Local<v8::Function> js_cb = | |
+ v8::Local<v8::Function>::New(env->isolate, ref); | |
+ call_js_cb(env, | |
+ v8impl::JsValueFromV8LocalValue(js_cb), | |
+ context, | |
+ data); | |
+ } | |
+ } | |
+ } | |
+ | |
+ node::Environment* NodeEnv() { | |
+ // For some reason grabbing the Node.js environment requires a handle scope. | |
+ v8::HandleScope scope(env->isolate); | |
+ return node::Environment::GetCurrent(env->isolate); | |
+ } | |
+ | |
+ void MaybeStartIdle() { | |
+ if (uv_idle_start(&idle, IdleCb) != 0) { | |
+ v8::HandleScope scope(env->isolate); | |
+ CallbackScope cb_scope(this); | |
+ CHECK(napi_throw_error(env, | |
+ "ERR_NAPI_TSFN_START_IDLE_LOOP", | |
+ "Failed to start the idle loop") == napi_ok); | |
+ } | |
+ } | |
+ | |
+ void Finalize() { | |
+ v8::HandleScope scope(env->isolate); | |
+ if (finalize_cb) { | |
+ CallbackScope cb_scope(this); | |
+ finalize_cb(env, finalize_data, context); | |
+ } | |
+ EmptyQueueAndDelete(); | |
+ } | |
+ | |
+ inline void* Context() { | |
+ return context; | |
+ } | |
+ | |
+ void CloseHandlesAndMaybeDelete(bool set_closing = false) { | |
+ if (set_closing) { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ is_closing = true; | |
+ if (max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ } | |
+ if (handles_closing) { | |
+ return; | |
+ } | |
+ handles_closing = true; | |
+ NodeEnv()->CloseHandle( | |
+ reinterpret_cast<uv_handle_t*>(&async), | |
+ [](uv_handle_t* handle) -> void { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::async, | |
+ reinterpret_cast<uv_async_t*>(handle)); | |
+ ts_fn->NodeEnv()->CloseHandle( | |
+ reinterpret_cast<uv_handle_t*>(&ts_fn->idle), | |
+ [](uv_handle_t* handle) -> void { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::idle, | |
+ reinterpret_cast<uv_idle_t*>(handle)); | |
+ ts_fn->Finalize(); | |
+ }); | |
+ }); | |
+ } | |
+ | |
+ // Default way of calling into JavaScript. Used when ThreadSafeFunction is | |
+ // without a call_js_cb_. | |
+ static void CallJs(napi_env env, napi_value cb, void* context, void* data) { | |
+ if (!(env == nullptr || cb == nullptr)) { | |
+ napi_value recv; | |
+ napi_status status; | |
+ | |
+ status = napi_get_undefined(env, &recv); | |
+ if (status != napi_ok) { | |
+ napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", | |
+ "Failed to retrieve undefined value"); | |
+ return; | |
+ } | |
+ | |
+ status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); | |
+ if (status != napi_ok && status != napi_pending_exception) { | |
+ napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", | |
+ "Failed to call JS callback"); | |
+ return; | |
+ } | |
+ } | |
+ } | |
+ | |
+ static void IdleCb(uv_idle_t* idle) { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::idle, idle); | |
+ ts_fn->DispatchOne(); | |
+ } | |
+ | |
+ static void AsyncCb(uv_async_t* async) { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::async, async); | |
+ ts_fn->MaybeStartIdle(); | |
+ } | |
+ | |
+ static void Cleanup(void* data) { | |
+ reinterpret_cast<ThreadSafeFunction*>(data) | |
+ ->CloseHandlesAndMaybeDelete(true); | |
+ } | |
+ | |
+ private: | |
+ // These are variables protected by the mutex. | |
+ node::Mutex mutex; | |
+ std::unique_ptr<node::ConditionVariable> cond; | |
+ std::queue<void*> queue; | |
+ uv_async_t async; | |
+ uv_idle_t idle; | |
+ size_t thread_count; | |
+ bool is_closing; | |
+ | |
+ // These are variables set once, upon creation, and then never again, which | |
+ // means we don't need the mutex to read them. | |
+ void* context; | |
+ size_t max_queue_size; | |
+ | |
+ // These are variables accessed only from the loop thread. | |
+ node::Persistent<v8::Function> ref; | |
+ napi_env env; | |
+ void* finalize_data; | |
+ napi_finalize finalize_cb; | |
+ napi_threadsafe_function_call_js call_js_cb; | |
+ bool handles_closing; | |
+}; | |
+ | |
} // end of namespace v8impl | |
// Intercepts the Node-V8 module registration callback. Converts parameters | |
@@ -3695,334 +4025,7 @@ napi_status napi_run_script(napi_env env, | |
return GET_RETURN_STATUS(env); | |
} | |
-class TsFn: public node::AsyncResource { | |
- public: | |
- TsFn(v8::Local<v8::Function> func, | |
- v8::Local<v8::Object> resource, | |
- v8::Local<v8::String> name, | |
- size_t thread_count_, | |
- void* context_, | |
- size_t max_queue_size_, | |
- napi_env env_, | |
- void* finalize_data_, | |
- napi_finalize finalize_cb_, | |
- napi_threadsafe_function_call_js call_js_cb_): | |
- AsyncResource(env_->isolate, | |
- resource, | |
- *v8::String::Utf8Value(env_->isolate, name)), | |
- thread_count(thread_count_), | |
- is_closing(false), | |
- context(context_), | |
- max_queue_size(max_queue_size_), | |
- env(env_), | |
- finalize_data(finalize_data_), | |
- finalize_cb(finalize_cb_), | |
- call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), | |
- handles_closing(false) { | |
- ref.Reset(env->isolate, func); | |
- node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
- } | |
- | |
- ~TsFn() { | |
- node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
- } | |
- | |
- // These methods can be called from any thread. | |
- | |
- napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- | |
- while (queue.size() >= max_queue_size && | |
- max_queue_size > 0 && | |
- !is_closing) { | |
- if (mode == napi_tsfn_nonblocking) { | |
- return napi_queue_full; | |
- } | |
- cond->Wait(lock); | |
- } | |
- | |
- if (is_closing) { | |
- if (thread_count == 0) { | |
- return napi_invalid_arg; | |
- } else { | |
- thread_count--; | |
- return napi_closing; | |
- } | |
- } else { | |
- if (uv_async_send(&async) != 0) { | |
- return napi_generic_failure; | |
- } | |
- queue.push(data); | |
- return napi_ok; | |
- } | |
- } | |
- | |
- napi_status Acquire() { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- | |
- if (is_closing) { | |
- return napi_closing; | |
- } | |
- | |
- thread_count++; | |
- | |
- return napi_ok; | |
- } | |
- | |
- napi_status Release(napi_threadsafe_function_release_mode mode) { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- | |
- if (thread_count == 0) { | |
- return napi_invalid_arg; | |
- } | |
- | |
- thread_count--; | |
- | |
- if (thread_count == 0 || mode == napi_tsfn_abort) { | |
- if (!is_closing) { | |
- is_closing = (mode == napi_tsfn_abort); | |
- if (is_closing && max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- if (uv_async_send(&async) != 0) { | |
- return napi_generic_failure; | |
- } | |
- } | |
- } | |
- | |
- return napi_ok; | |
- } | |
- | |
- void EmptyQueueAndDelete() { | |
- for (; !queue.empty() ; queue.pop()) { | |
- call_js_cb(nullptr, nullptr, context, queue.front()); | |
- } | |
- delete this; | |
- } | |
- | |
- // These methods must only be called from the loop thread. | |
- | |
- napi_status Init() { | |
- TsFn* ts_fn = this; | |
- | |
- if (uv_async_init(env->loop, &async, AsyncCb) == 0) { | |
- if (max_queue_size > 0) { | |
- cond.reset(new node::ConditionVariable); | |
- } | |
- if ((max_queue_size == 0 || cond.get() != nullptr) && | |
- uv_idle_init(env->loop, &idle) == 0) { | |
- return napi_ok; | |
- } | |
- | |
- node::Environment::GetCurrent(env->isolate)->CloseHandle( | |
- reinterpret_cast<uv_handle_t*>(&async), | |
- [] (uv_handle_t* handle) -> void { | |
- TsFn* ts_fn = | |
- node::ContainerOf(&TsFn::async, | |
- reinterpret_cast<uv_async_t*>(handle)); | |
- delete ts_fn; | |
- }); | |
- | |
- // Prevent the thread-safe function from being deleted here, because | |
- // the callback above will delete it. | |
- ts_fn = nullptr; | |
- } | |
- | |
- delete ts_fn; | |
- | |
- return napi_generic_failure; | |
- } | |
- | |
- napi_status Unref() { | |
- uv_unref(reinterpret_cast<uv_handle_t*>(&async)); | |
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); | |
- | |
- return napi_ok; | |
- } | |
- | |
- napi_status Ref() { | |
- uv_ref(reinterpret_cast<uv_handle_t*>(&async)); | |
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); | |
- | |
- return napi_ok; | |
- } | |
- | |
- void DispatchOne() { | |
- void* data = nullptr; | |
- bool popped_value = false; | |
- bool idle_stop_failed = false; | |
- | |
- { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- if (is_closing) { | |
- CloseHandlesAndMaybeDelete(); | |
- } else { | |
- size_t size = queue.size(); | |
- if (size > 0) { | |
- data = queue.front(); | |
- queue.pop(); | |
- popped_value = true; | |
- if (size == max_queue_size && max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- size--; | |
- } | |
- | |
- if (size == 0) { | |
- if (thread_count == 0) { | |
- is_closing = true; | |
- if (max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- CloseHandlesAndMaybeDelete(); | |
- } else { | |
- if (uv_idle_stop(&idle) != 0) { | |
- idle_stop_failed = true; | |
- } | |
- } | |
- } | |
- } | |
- } | |
- | |
- if (popped_value || idle_stop_failed) { | |
- v8::HandleScope scope(env->isolate); | |
- CallbackScope cb_scope(this); | |
- | |
- if (idle_stop_failed) { | |
- CHECK(napi_throw_error(env, | |
- "ERR_NAPI_TSFN_STOP_IDLE_LOOP", | |
- "Failed to stop the idle loop") == napi_ok); | |
- } else { | |
- v8::Local<v8::Function> js_cb = | |
- v8::Local<v8::Function>::New(env->isolate, ref); | |
- call_js_cb(env, | |
- v8impl::JsValueFromV8LocalValue(js_cb), | |
- context, | |
- data); | |
- } | |
- } | |
- } | |
- | |
- node::Environment* NodeEnv() { | |
- // For some reason grabbing the Node.js environment requires a handle scope. | |
- v8::HandleScope scope(env->isolate); | |
- return node::Environment::GetCurrent(env->isolate); | |
- } | |
- | |
- void MaybeStartIdle() { | |
- if (uv_idle_start(&idle, IdleCb) != 0) { | |
- v8::HandleScope scope(env->isolate); | |
- CallbackScope cb_scope(this); | |
- CHECK(napi_throw_error(env, | |
- "ERR_NAPI_TSFN_START_IDLE_LOOP", | |
- "Failed to start the idle loop") == napi_ok); | |
- } | |
- } | |
- | |
- void Finalize() { | |
- v8::HandleScope scope(env->isolate); | |
- if (finalize_cb) { | |
- CallbackScope cb_scope(this); | |
- finalize_cb(env, finalize_data, context); | |
- } | |
- EmptyQueueAndDelete(); | |
- } | |
- | |
- inline void* Context() { | |
- return context; | |
- } | |
- | |
- void CloseHandlesAndMaybeDelete(bool set_closing = false) { | |
- if (set_closing) { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- is_closing = true; | |
- if (max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- } | |
- if (handles_closing) { | |
- return; | |
- } | |
- handles_closing = true; | |
- NodeEnv()->CloseHandle( | |
- reinterpret_cast<uv_handle_t*>(&async), | |
- [] (uv_handle_t* handle) -> void { | |
- TsFn* ts_fn = node::ContainerOf(&TsFn::async, | |
- reinterpret_cast<uv_async_t*>(handle)); | |
- ts_fn->NodeEnv()->CloseHandle( | |
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle), | |
- [] (uv_handle_t* handle) -> void { | |
- TsFn* ts_fn = node::ContainerOf(&TsFn::idle, | |
- reinterpret_cast<uv_idle_t*>(handle)); | |
- ts_fn->Finalize(); | |
- }); | |
- }); | |
- } | |
- | |
- // Default way of calling into JavaScript. Used when TsFn is constructed | |
- // without a call_js_cb_. | |
- static void CallJs(napi_env env, napi_value cb, void* context, void* data) { | |
- if (!(env == nullptr || cb == nullptr)) { | |
- napi_value recv; | |
- napi_status status; | |
- | |
- status = napi_get_undefined(env, &recv); | |
- if (status != napi_ok) { | |
- napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", | |
- "Failed to retrieve undefined value"); | |
- return; | |
- } | |
- | |
- status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); | |
- if (status != napi_ok && status != napi_pending_exception) { | |
- napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", | |
- "Failed to call JS callback"); | |
- return; | |
- } | |
- } | |
- } | |
- | |
- static void IdleCb(uv_idle_t* idle) { | |
- TsFn* ts_fn = | |
- node::ContainerOf(&TsFn::idle, idle); | |
- ts_fn->DispatchOne(); | |
- } | |
- | |
- static void AsyncCb(uv_async_t* async) { | |
- TsFn* ts_fn = | |
- node::ContainerOf(&TsFn::async, async); | |
- ts_fn->MaybeStartIdle(); | |
- } | |
- | |
- static void Cleanup(void* data) { | |
- reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true); | |
- } | |
- | |
- private: | |
- // These are variables protected by the mutex. | |
- node::Mutex mutex; | |
- std::unique_ptr<node::ConditionVariable> cond; | |
- std::queue<void*> queue; | |
- uv_async_t async; | |
- uv_idle_t idle; | |
- size_t thread_count; | |
- bool is_closing; | |
- | |
- // These are variables set once, upon creation, and then never again, which | |
- // means we don't need the mutex to read them. | |
- void* context; | |
- size_t max_queue_size; | |
- | |
- // These are variables accessed only from the loop thread. | |
- node::Persistent<v8::Function> ref; | |
- napi_env env; | |
- void* finalize_data; | |
- napi_finalize finalize_cb; | |
- napi_threadsafe_function_call_js call_js_cb; | |
- bool handles_closing; | |
-}; | |
- | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_create_threadsafe_function(napi_env env, | |
napi_value func, | |
napi_value async_resource, | |
@@ -4057,16 +4060,17 @@ napi_create_threadsafe_function(napi_env env, | |
v8::Local<v8::String> v8_name; | |
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); | |
- TsFn* ts_fn = new TsFn(v8_func, | |
- v8_resource, | |
- v8_name, | |
- initial_thread_count, | |
- context, | |
- max_queue_size, | |
- env, | |
- thread_finalize_data, | |
- thread_finalize_cb, | |
- call_js_cb); | |
+ v8impl::ThreadSafeFunction* ts_fn = | |
+ new v8impl::ThreadSafeFunction(v8_func, | |
+ v8_resource, | |
+ v8_name, | |
+ initial_thread_count, | |
+ context, | |
+ max_queue_size, | |
+ env, | |
+ thread_finalize_data, | |
+ thread_finalize_cb, | |
+ call_js_cb); | |
if (ts_fn == nullptr) { | |
status = napi_generic_failure; | |
@@ -4081,45 +4085,46 @@ napi_create_threadsafe_function(napi_env env, | |
return napi_set_last_error(env, status); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_get_threadsafe_function_context(napi_threadsafe_function func, | |
void** result) { | |
CHECK(func != nullptr); | |
CHECK(result != nullptr); | |
- *result = reinterpret_cast<TsFn*>(func)->Context(); | |
+ *result = reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Context(); | |
return napi_ok; | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_call_threadsafe_function(napi_threadsafe_function func, | |
void* data, | |
napi_threadsafe_function_call_mode is_blocking) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Push(data, | |
+ is_blocking); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_acquire_threadsafe_function(napi_threadsafe_function func) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Acquire(); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Acquire(); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_release_threadsafe_function(napi_threadsafe_function func, | |
napi_threadsafe_function_release_mode mode) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Release(mode); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Release(mode); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Unref(); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Unref(); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Ref(); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Ref(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/node_api.cc b/src/node_api.cc | |
index 7b6e43d0ce..1dd31107dc 100644 | |
--- a/src/node_api.cc | |
+++ b/src/node_api.cc | |
@@ -827,6 +827,336 @@ napi_status ConcludeDeferred(napi_env env, | |
return GET_RETURN_STATUS(env); | |
} | |
+class ThreadSafeFunction : public node::AsyncResource { | |
+ public: | |
+ ThreadSafeFunction(v8::Local<v8::Function> func, | |
+ v8::Local<v8::Object> resource, | |
+ v8::Local<v8::String> name, | |
+ size_t thread_count_, | |
+ void* context_, | |
+ size_t max_queue_size_, | |
+ napi_env env_, | |
+ void* finalize_data_, | |
+ napi_finalize finalize_cb_, | |
+ napi_threadsafe_function_call_js call_js_cb_): | |
+ AsyncResource(env_->isolate, | |
+ resource, | |
+ *v8::String::Utf8Value(env_->isolate, name)), | |
+ thread_count(thread_count_), | |
+ is_closing(false), | |
+ context(context_), | |
+ max_queue_size(max_queue_size_), | |
+ env(env_), | |
+ finalize_data(finalize_data_), | |
+ finalize_cb(finalize_cb_), | |
+ call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), | |
+ handles_closing(false) { | |
+ ref.Reset(env->isolate, func); | |
+ node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
+ } | |
+ | |
+ ~ThreadSafeFunction() { | |
+ node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
+ } | |
+ | |
+ // These methods can be called from any thread. | |
+ | |
+ napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ | |
+ while (queue.size() >= max_queue_size && | |
+ max_queue_size > 0 && | |
+ !is_closing) { | |
+ if (mode == napi_tsfn_nonblocking) { | |
+ return napi_queue_full; | |
+ } | |
+ cond->Wait(lock); | |
+ } | |
+ | |
+ if (is_closing) { | |
+ if (thread_count == 0) { | |
+ return napi_invalid_arg; | |
+ } else { | |
+ thread_count--; | |
+ return napi_closing; | |
+ } | |
+ } else { | |
+ if (uv_async_send(&async) != 0) { | |
+ return napi_generic_failure; | |
+ } | |
+ queue.push(data); | |
+ return napi_ok; | |
+ } | |
+ } | |
+ | |
+ napi_status Acquire() { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ | |
+ if (is_closing) { | |
+ return napi_closing; | |
+ } | |
+ | |
+ thread_count++; | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ napi_status Release(napi_threadsafe_function_release_mode mode) { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ | |
+ if (thread_count == 0) { | |
+ return napi_invalid_arg; | |
+ } | |
+ | |
+ thread_count--; | |
+ | |
+ if (thread_count == 0 || mode == napi_tsfn_abort) { | |
+ if (!is_closing) { | |
+ is_closing = (mode == napi_tsfn_abort); | |
+ if (is_closing && max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ if (uv_async_send(&async) != 0) { | |
+ return napi_generic_failure; | |
+ } | |
+ } | |
+ } | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ void EmptyQueueAndDelete() { | |
+ for (; !queue.empty() ; queue.pop()) { | |
+ call_js_cb(nullptr, nullptr, context, queue.front()); | |
+ } | |
+ delete this; | |
+ } | |
+ | |
+ // These methods must only be called from the loop thread. | |
+ | |
+ napi_status Init() { | |
+ ThreadSafeFunction* ts_fn = this; | |
+ | |
+ if (uv_async_init(env->loop, &async, AsyncCb) == 0) { | |
+ if (max_queue_size > 0) { | |
+ cond.reset(new node::ConditionVariable); | |
+ } | |
+ if ((max_queue_size == 0 || cond.get() != nullptr) && | |
+ uv_idle_init(env->loop, &idle) == 0) { | |
+ return napi_ok; | |
+ } | |
+ | |
+ node::Environment::GetCurrent(env->isolate)->CloseHandle( | |
+ reinterpret_cast<uv_handle_t*>(&async), | |
+ [](uv_handle_t* handle) -> void { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::async, | |
+ reinterpret_cast<uv_async_t*>(handle)); | |
+ delete ts_fn; | |
+ }); | |
+ | |
+ // Prevent the thread-safe function from being deleted here, because | |
+ // the callback above will delete it. | |
+ ts_fn = nullptr; | |
+ } | |
+ | |
+ delete ts_fn; | |
+ | |
+ return napi_generic_failure; | |
+ } | |
+ | |
+ napi_status Unref() { | |
+ uv_unref(reinterpret_cast<uv_handle_t*>(&async)); | |
+ uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ napi_status Ref() { | |
+ uv_ref(reinterpret_cast<uv_handle_t*>(&async)); | |
+ uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); | |
+ | |
+ return napi_ok; | |
+ } | |
+ | |
+ void DispatchOne() { | |
+ void* data = nullptr; | |
+ bool popped_value = false; | |
+ bool idle_stop_failed = false; | |
+ | |
+ { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ if (is_closing) { | |
+ CloseHandlesAndMaybeDelete(); | |
+ } else { | |
+ size_t size = queue.size(); | |
+ if (size > 0) { | |
+ data = queue.front(); | |
+ queue.pop(); | |
+ popped_value = true; | |
+ if (size == max_queue_size && max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ size--; | |
+ } | |
+ | |
+ if (size == 0) { | |
+ if (thread_count == 0) { | |
+ is_closing = true; | |
+ if (max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ CloseHandlesAndMaybeDelete(); | |
+ } else { | |
+ if (uv_idle_stop(&idle) != 0) { | |
+ idle_stop_failed = true; | |
+ } | |
+ } | |
+ } | |
+ } | |
+ } | |
+ | |
+ if (popped_value || idle_stop_failed) { | |
+ v8::HandleScope scope(env->isolate); | |
+ CallbackScope cb_scope(this); | |
+ | |
+ if (idle_stop_failed) { | |
+ CHECK(napi_throw_error(env, | |
+ "ERR_NAPI_TSFN_STOP_IDLE_LOOP", | |
+ "Failed to stop the idle loop") == napi_ok); | |
+ } else { | |
+ v8::Local<v8::Function> js_cb = | |
+ v8::Local<v8::Function>::New(env->isolate, ref); | |
+ call_js_cb(env, | |
+ v8impl::JsValueFromV8LocalValue(js_cb), | |
+ context, | |
+ data); | |
+ } | |
+ } | |
+ } | |
+ | |
+ node::Environment* NodeEnv() { | |
+ // For some reason grabbing the Node.js environment requires a handle scope. | |
+ v8::HandleScope scope(env->isolate); | |
+ return node::Environment::GetCurrent(env->isolate); | |
+ } | |
+ | |
+ void MaybeStartIdle() { | |
+ if (uv_idle_start(&idle, IdleCb) != 0) { | |
+ v8::HandleScope scope(env->isolate); | |
+ CallbackScope cb_scope(this); | |
+ CHECK(napi_throw_error(env, | |
+ "ERR_NAPI_TSFN_START_IDLE_LOOP", | |
+ "Failed to start the idle loop") == napi_ok); | |
+ } | |
+ } | |
+ | |
+ void Finalize() { | |
+ v8::HandleScope scope(env->isolate); | |
+ if (finalize_cb) { | |
+ CallbackScope cb_scope(this); | |
+ finalize_cb(env, finalize_data, context); | |
+ } | |
+ EmptyQueueAndDelete(); | |
+ } | |
+ | |
+ inline void* Context() { | |
+ return context; | |
+ } | |
+ | |
+ void CloseHandlesAndMaybeDelete(bool set_closing = false) { | |
+ if (set_closing) { | |
+ node::Mutex::ScopedLock lock(this->mutex); | |
+ is_closing = true; | |
+ if (max_queue_size > 0) { | |
+ cond->Signal(lock); | |
+ } | |
+ } | |
+ if (handles_closing) { | |
+ return; | |
+ } | |
+ handles_closing = true; | |
+ NodeEnv()->CloseHandle( | |
+ reinterpret_cast<uv_handle_t*>(&async), | |
+ [](uv_handle_t* handle) -> void { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::async, | |
+ reinterpret_cast<uv_async_t*>(handle)); | |
+ ts_fn->NodeEnv()->CloseHandle( | |
+ reinterpret_cast<uv_handle_t*>(&ts_fn->idle), | |
+ [](uv_handle_t* handle) -> void { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::idle, | |
+ reinterpret_cast<uv_idle_t*>(handle)); | |
+ ts_fn->Finalize(); | |
+ }); | |
+ }); | |
+ } | |
+ | |
+ // Default way of calling into JavaScript. Used when ThreadSafeFunction is | |
+ // without a call_js_cb_. | |
+ static void CallJs(napi_env env, napi_value cb, void* context, void* data) { | |
+ if (!(env == nullptr || cb == nullptr)) { | |
+ napi_value recv; | |
+ napi_status status; | |
+ | |
+ status = napi_get_undefined(env, &recv); | |
+ if (status != napi_ok) { | |
+ napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", | |
+ "Failed to retrieve undefined value"); | |
+ return; | |
+ } | |
+ | |
+ status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); | |
+ if (status != napi_ok && status != napi_pending_exception) { | |
+ napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", | |
+ "Failed to call JS callback"); | |
+ return; | |
+ } | |
+ } | |
+ } | |
+ | |
+ static void IdleCb(uv_idle_t* idle) { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::idle, idle); | |
+ ts_fn->DispatchOne(); | |
+ } | |
+ | |
+ static void AsyncCb(uv_async_t* async) { | |
+ ThreadSafeFunction* ts_fn = | |
+ node::ContainerOf(&ThreadSafeFunction::async, async); | |
+ ts_fn->MaybeStartIdle(); | |
+ } | |
+ | |
+ static void Cleanup(void* data) { | |
+ reinterpret_cast<ThreadSafeFunction*>(data) | |
+ ->CloseHandlesAndMaybeDelete(true); | |
+ } | |
+ | |
+ private: | |
+ // These are variables protected by the mutex. | |
+ node::Mutex mutex; | |
+ std::unique_ptr<node::ConditionVariable> cond; | |
+ std::queue<void*> queue; | |
+ uv_async_t async; | |
+ uv_idle_t idle; | |
+ size_t thread_count; | |
+ bool is_closing; | |
+ | |
+ // These are variables set once, upon creation, and then never again, which | |
+ // means we don't need the mutex to read them. | |
+ void* context; | |
+ size_t max_queue_size; | |
+ | |
+ // These are variables accessed only from the loop thread. | |
+ node::Persistent<v8::Function> ref; | |
+ napi_env env; | |
+ void* finalize_data; | |
+ napi_finalize finalize_cb; | |
+ napi_threadsafe_function_call_js call_js_cb; | |
+ bool handles_closing; | |
+}; | |
+ | |
} // end of namespace v8impl | |
// Intercepts the Node-V8 module registration callback. Converts parameters | |
@@ -3581,448 +3911,121 @@ napi_status napi_create_async_work(napi_env env, | |
return napi_clear_last_error(env); | |
} | |
-napi_status napi_delete_async_work(napi_env env, napi_async_work work) { | |
- CHECK_ENV(env); | |
- CHECK_ARG(env, work); | |
- | |
- uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work)); | |
- | |
- return napi_clear_last_error(env); | |
-} | |
- | |
-napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { | |
- CHECK_ENV(env); | |
- CHECK_ARG(env, loop); | |
- *loop = env->loop; | |
- return napi_clear_last_error(env); | |
-} | |
- | |
-napi_status napi_queue_async_work(napi_env env, napi_async_work work) { | |
- CHECK_ENV(env); | |
- CHECK_ARG(env, work); | |
- | |
- napi_status status; | |
- uv_loop_t* event_loop = nullptr; | |
- status = napi_get_uv_event_loop(env, &event_loop); | |
- if (status != napi_ok) | |
- return napi_set_last_error(env, status); | |
- | |
- uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); | |
- | |
- w->ScheduleWork(); | |
- | |
- return napi_clear_last_error(env); | |
-} | |
- | |
-napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { | |
- CHECK_ENV(env); | |
- CHECK_ARG(env, work); | |
- | |
- uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); | |
- | |
- CALL_UV(env, w->CancelWork()); | |
- | |
- return napi_clear_last_error(env); | |
-} | |
- | |
-napi_status napi_create_promise(napi_env env, | |
- napi_deferred* deferred, | |
- napi_value* promise) { | |
- NAPI_PREAMBLE(env); | |
- CHECK_ARG(env, deferred); | |
- CHECK_ARG(env, promise); | |
- | |
- auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext()); | |
- CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure); | |
- | |
- auto v8_resolver = maybe.ToLocalChecked(); | |
- auto v8_deferred = new node::Persistent<v8::Value>(); | |
- v8_deferred->Reset(env->isolate, v8_resolver); | |
- | |
- *deferred = v8impl::JsDeferredFromNodePersistent(v8_deferred); | |
- *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise()); | |
- return GET_RETURN_STATUS(env); | |
-} | |
- | |
-napi_status napi_resolve_deferred(napi_env env, | |
- napi_deferred deferred, | |
- napi_value resolution) { | |
- return v8impl::ConcludeDeferred(env, deferred, resolution, true); | |
-} | |
- | |
-napi_status napi_reject_deferred(napi_env env, | |
- napi_deferred deferred, | |
- napi_value resolution) { | |
- return v8impl::ConcludeDeferred(env, deferred, resolution, false); | |
-} | |
- | |
-napi_status napi_is_promise(napi_env env, | |
- napi_value promise, | |
- bool* is_promise) { | |
- CHECK_ENV(env); | |
- CHECK_ARG(env, promise); | |
- CHECK_ARG(env, is_promise); | |
- | |
- *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise(); | |
- | |
- return napi_clear_last_error(env); | |
-} | |
- | |
-napi_status napi_run_script(napi_env env, | |
- napi_value script, | |
- napi_value* result) { | |
- NAPI_PREAMBLE(env); | |
- CHECK_ARG(env, script); | |
- CHECK_ARG(env, result); | |
- | |
- v8::Local<v8::Value> v8_script = v8impl::V8LocalValueFromJsValue(script); | |
- | |
- if (!v8_script->IsString()) { | |
- return napi_set_last_error(env, napi_string_expected); | |
- } | |
- | |
- v8::Local<v8::Context> context = env->isolate->GetCurrentContext(); | |
- | |
- auto maybe_script = v8::Script::Compile(context, | |
- v8::Local<v8::String>::Cast(v8_script)); | |
- CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure); | |
- | |
- auto script_result = | |
- maybe_script.ToLocalChecked()->Run(context); | |
- CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure); | |
- | |
- *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); | |
- return GET_RETURN_STATUS(env); | |
-} | |
- | |
-class TsFn: public node::AsyncResource { | |
- public: | |
- TsFn(v8::Local<v8::Function> func, | |
- v8::Local<v8::Object> resource, | |
- v8::Local<v8::String> name, | |
- size_t thread_count_, | |
- void* context_, | |
- size_t max_queue_size_, | |
- napi_env env_, | |
- void* finalize_data_, | |
- napi_finalize finalize_cb_, | |
- napi_threadsafe_function_call_js call_js_cb_): | |
- AsyncResource(env_->isolate, | |
- resource, | |
- *v8::String::Utf8Value(env_->isolate, name)), | |
- thread_count(thread_count_), | |
- is_closing(false), | |
- context(context_), | |
- max_queue_size(max_queue_size_), | |
- env(env_), | |
- finalize_data(finalize_data_), | |
- finalize_cb(finalize_cb_), | |
- call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), | |
- handles_closing(false) { | |
- ref.Reset(env->isolate, func); | |
- node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
- } | |
- | |
- ~TsFn() { | |
- node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); | |
- } | |
- | |
- // These methods can be called from any thread. | |
- | |
- napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- | |
- while (queue.size() >= max_queue_size && | |
- max_queue_size > 0 && | |
- !is_closing) { | |
- if (mode == napi_tsfn_nonblocking) { | |
- return napi_queue_full; | |
- } | |
- cond->Wait(lock); | |
- } | |
- | |
- if (is_closing) { | |
- if (thread_count == 0) { | |
- return napi_invalid_arg; | |
- } else { | |
- thread_count--; | |
- return napi_closing; | |
- } | |
- } else { | |
- if (uv_async_send(&async) != 0) { | |
- return napi_generic_failure; | |
- } | |
- queue.push(data); | |
- return napi_ok; | |
- } | |
- } | |
- | |
- napi_status Acquire() { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- | |
- if (is_closing) { | |
- return napi_closing; | |
- } | |
- | |
- thread_count++; | |
- | |
- return napi_ok; | |
- } | |
- | |
- napi_status Release(napi_threadsafe_function_release_mode mode) { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- | |
- if (thread_count == 0) { | |
- return napi_invalid_arg; | |
- } | |
- | |
- thread_count--; | |
- | |
- if (thread_count == 0 || mode == napi_tsfn_abort) { | |
- if (!is_closing) { | |
- is_closing = (mode == napi_tsfn_abort); | |
- if (is_closing && max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- if (uv_async_send(&async) != 0) { | |
- return napi_generic_failure; | |
- } | |
- } | |
- } | |
- | |
- return napi_ok; | |
- } | |
- | |
- void EmptyQueueAndDelete() { | |
- for (; !queue.empty() ; queue.pop()) { | |
- call_js_cb(nullptr, nullptr, context, queue.front()); | |
- } | |
- delete this; | |
- } | |
- | |
- // These methods must only be called from the loop thread. | |
- | |
- napi_status Init() { | |
- TsFn* ts_fn = this; | |
+napi_status napi_delete_async_work(napi_env env, napi_async_work work) { | |
+ CHECK_ENV(env); | |
+ CHECK_ARG(env, work); | |
- if (uv_async_init(env->loop, &async, AsyncCb) == 0) { | |
- if (max_queue_size > 0) { | |
- cond.reset(new node::ConditionVariable); | |
- } | |
- if ((max_queue_size == 0 || cond.get() != nullptr) && | |
- uv_idle_init(env->loop, &idle) == 0) { | |
- return napi_ok; | |
- } | |
+ uvimpl::Work::Delete(reinterpret_cast<uvimpl::Work*>(work)); | |
- node::Environment::GetCurrent(env->isolate)->CloseHandle( | |
- reinterpret_cast<uv_handle_t*>(&async), | |
- [] (uv_handle_t* handle) -> void { | |
- TsFn* ts_fn = | |
- node::ContainerOf(&TsFn::async, | |
- reinterpret_cast<uv_async_t*>(handle)); | |
- delete ts_fn; | |
- }); | |
+ return napi_clear_last_error(env); | |
+} | |
- // Prevent the thread-safe function from being deleted here, because | |
- // the callback above will delete it. | |
- ts_fn = nullptr; | |
- } | |
+napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { | |
+ CHECK_ENV(env); | |
+ CHECK_ARG(env, loop); | |
+ *loop = env->loop; | |
+ return napi_clear_last_error(env); | |
+} | |
- delete ts_fn; | |
+napi_status napi_queue_async_work(napi_env env, napi_async_work work) { | |
+ CHECK_ENV(env); | |
+ CHECK_ARG(env, work); | |
- return napi_generic_failure; | |
- } | |
+ napi_status status; | |
+ uv_loop_t* event_loop = nullptr; | |
+ status = napi_get_uv_event_loop(env, &event_loop); | |
+ if (status != napi_ok) | |
+ return napi_set_last_error(env, status); | |
- napi_status Unref() { | |
- uv_unref(reinterpret_cast<uv_handle_t*>(&async)); | |
- uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); | |
+ uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); | |
- return napi_ok; | |
- } | |
+ w->ScheduleWork(); | |
- napi_status Ref() { | |
- uv_ref(reinterpret_cast<uv_handle_t*>(&async)); | |
- uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); | |
+ return napi_clear_last_error(env); | |
+} | |
- return napi_ok; | |
- } | |
+napi_status napi_cancel_async_work(napi_env env, napi_async_work work) { | |
+ CHECK_ENV(env); | |
+ CHECK_ARG(env, work); | |
- void DispatchOne() { | |
- void* data = nullptr; | |
- bool popped_value = false; | |
- bool idle_stop_failed = false; | |
+ uvimpl::Work* w = reinterpret_cast<uvimpl::Work*>(work); | |
- { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- if (is_closing) { | |
- CloseHandlesAndMaybeDelete(); | |
- } else { | |
- size_t size = queue.size(); | |
- if (size > 0) { | |
- data = queue.front(); | |
- queue.pop(); | |
- popped_value = true; | |
- if (size == max_queue_size && max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- size--; | |
- } | |
+ CALL_UV(env, w->CancelWork()); | |
- if (size == 0) { | |
- if (thread_count == 0) { | |
- is_closing = true; | |
- if (max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- CloseHandlesAndMaybeDelete(); | |
- } else { | |
- if (uv_idle_stop(&idle) != 0) { | |
- idle_stop_failed = true; | |
- } | |
- } | |
- } | |
- } | |
- } | |
+ return napi_clear_last_error(env); | |
+} | |
- if (popped_value || idle_stop_failed) { | |
- v8::HandleScope scope(env->isolate); | |
- CallbackScope cb_scope(this); | |
+napi_status napi_create_promise(napi_env env, | |
+ napi_deferred* deferred, | |
+ napi_value* promise) { | |
+ NAPI_PREAMBLE(env); | |
+ CHECK_ARG(env, deferred); | |
+ CHECK_ARG(env, promise); | |
- if (idle_stop_failed) { | |
- CHECK(napi_throw_error(env, | |
- "ERR_NAPI_TSFN_STOP_IDLE_LOOP", | |
- "Failed to stop the idle loop") == napi_ok); | |
- } else { | |
- v8::Local<v8::Function> js_cb = | |
- v8::Local<v8::Function>::New(env->isolate, ref); | |
- call_js_cb(env, | |
- v8impl::JsValueFromV8LocalValue(js_cb), | |
- context, | |
- data); | |
- } | |
- } | |
- } | |
+ auto maybe = v8::Promise::Resolver::New(env->isolate->GetCurrentContext()); | |
+ CHECK_MAYBE_EMPTY(env, maybe, napi_generic_failure); | |
- node::Environment* NodeEnv() { | |
- // For some reason grabbing the Node.js environment requires a handle scope. | |
- v8::HandleScope scope(env->isolate); | |
- return node::Environment::GetCurrent(env->isolate); | |
- } | |
+ auto v8_resolver = maybe.ToLocalChecked(); | |
+ auto v8_deferred = new node::Persistent<v8::Value>(); | |
+ v8_deferred->Reset(env->isolate, v8_resolver); | |
- void MaybeStartIdle() { | |
- if (uv_idle_start(&idle, IdleCb) != 0) { | |
- v8::HandleScope scope(env->isolate); | |
- CallbackScope cb_scope(this); | |
- CHECK(napi_throw_error(env, | |
- "ERR_NAPI_TSFN_START_IDLE_LOOP", | |
- "Failed to start the idle loop") == napi_ok); | |
- } | |
- } | |
+ *deferred = v8impl::JsDeferredFromNodePersistent(v8_deferred); | |
+ *promise = v8impl::JsValueFromV8LocalValue(v8_resolver->GetPromise()); | |
+ return GET_RETURN_STATUS(env); | |
+} | |
- void Finalize() { | |
- v8::HandleScope scope(env->isolate); | |
- if (finalize_cb) { | |
- CallbackScope cb_scope(this); | |
- finalize_cb(env, finalize_data, context); | |
- } | |
- EmptyQueueAndDelete(); | |
- } | |
+napi_status napi_resolve_deferred(napi_env env, | |
+ napi_deferred deferred, | |
+ napi_value resolution) { | |
+ return v8impl::ConcludeDeferred(env, deferred, resolution, true); | |
+} | |
- inline void* Context() { | |
- return context; | |
- } | |
+napi_status napi_reject_deferred(napi_env env, | |
+ napi_deferred deferred, | |
+ napi_value resolution) { | |
+ return v8impl::ConcludeDeferred(env, deferred, resolution, false); | |
+} | |
- void CloseHandlesAndMaybeDelete(bool set_closing = false) { | |
- if (set_closing) { | |
- node::Mutex::ScopedLock lock(this->mutex); | |
- is_closing = true; | |
- if (max_queue_size > 0) { | |
- cond->Signal(lock); | |
- } | |
- } | |
- if (handles_closing) { | |
- return; | |
- } | |
- handles_closing = true; | |
- NodeEnv()->CloseHandle( | |
- reinterpret_cast<uv_handle_t*>(&async), | |
- [] (uv_handle_t* handle) -> void { | |
- TsFn* ts_fn = node::ContainerOf(&TsFn::async, | |
- reinterpret_cast<uv_async_t*>(handle)); | |
- ts_fn->NodeEnv()->CloseHandle( | |
- reinterpret_cast<uv_handle_t*>(&ts_fn->idle), | |
- [] (uv_handle_t* handle) -> void { | |
- TsFn* ts_fn = node::ContainerOf(&TsFn::idle, | |
- reinterpret_cast<uv_idle_t*>(handle)); | |
- ts_fn->Finalize(); | |
- }); | |
- }); | |
- } | |
+napi_status napi_is_promise(napi_env env, | |
+ napi_value promise, | |
+ bool* is_promise) { | |
+ CHECK_ENV(env); | |
+ CHECK_ARG(env, promise); | |
+ CHECK_ARG(env, is_promise); | |
- // Default way of calling into JavaScript. Used when TsFn is constructed | |
- // without a call_js_cb_. | |
- static void CallJs(napi_env env, napi_value cb, void* context, void* data) { | |
- if (!(env == nullptr || cb == nullptr)) { | |
- napi_value recv; | |
- napi_status status; | |
+ *is_promise = v8impl::V8LocalValueFromJsValue(promise)->IsPromise(); | |
- status = napi_get_undefined(env, &recv); | |
- if (status != napi_ok) { | |
- napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", | |
- "Failed to retrieve undefined value"); | |
- return; | |
- } | |
+ return napi_clear_last_error(env); | |
+} | |
- status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); | |
- if (status != napi_ok && status != napi_pending_exception) { | |
- napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", | |
- "Failed to call JS callback"); | |
- return; | |
- } | |
- } | |
- } | |
+napi_status napi_run_script(napi_env env, | |
+ napi_value script, | |
+ napi_value* result) { | |
+ NAPI_PREAMBLE(env); | |
+ CHECK_ARG(env, script); | |
+ CHECK_ARG(env, result); | |
- static void IdleCb(uv_idle_t* idle) { | |
- TsFn* ts_fn = | |
- node::ContainerOf(&TsFn::idle, idle); | |
- ts_fn->DispatchOne(); | |
- } | |
+ v8::Local<v8::Value> v8_script = v8impl::V8LocalValueFromJsValue(script); | |
- static void AsyncCb(uv_async_t* async) { | |
- TsFn* ts_fn = | |
- node::ContainerOf(&TsFn::async, async); | |
- ts_fn->MaybeStartIdle(); | |
+ if (!v8_script->IsString()) { | |
+ return napi_set_last_error(env, napi_string_expected); | |
} | |
- static void Cleanup(void* data) { | |
- reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true); | |
- } | |
+ v8::Local<v8::Context> context = env->isolate->GetCurrentContext(); | |
- private: | |
- // These are variables protected by the mutex. | |
- node::Mutex mutex; | |
- std::unique_ptr<node::ConditionVariable> cond; | |
- std::queue<void*> queue; | |
- uv_async_t async; | |
- uv_idle_t idle; | |
- size_t thread_count; | |
- bool is_closing; | |
+ auto maybe_script = v8::Script::Compile(context, | |
+ v8::Local<v8::String>::Cast(v8_script)); | |
+ CHECK_MAYBE_EMPTY(env, maybe_script, napi_generic_failure); | |
- // These are variables set once, upon creation, and then never again, which | |
- // means we don't need the mutex to read them. | |
- void* context; | |
- size_t max_queue_size; | |
+ auto script_result = | |
+ maybe_script.ToLocalChecked()->Run(context); | |
+ CHECK_MAYBE_EMPTY(env, script_result, napi_generic_failure); | |
- // These are variables accessed only from the loop thread. | |
- node::Persistent<v8::Function> ref; | |
- napi_env env; | |
- void* finalize_data; | |
- napi_finalize finalize_cb; | |
- napi_threadsafe_function_call_js call_js_cb; | |
- bool handles_closing; | |
-}; | |
+ *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); | |
+ return GET_RETURN_STATUS(env); | |
+} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_create_threadsafe_function(napi_env env, | |
napi_value func, | |
napi_value async_resource, | |
@@ -4057,16 +4060,17 @@ napi_create_threadsafe_function(napi_env env, | |
v8::Local<v8::String> v8_name; | |
CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); | |
- TsFn* ts_fn = new TsFn(v8_func, | |
- v8_resource, | |
- v8_name, | |
- initial_thread_count, | |
- context, | |
- max_queue_size, | |
- env, | |
- thread_finalize_data, | |
- thread_finalize_cb, | |
- call_js_cb); | |
+ v8impl::ThreadSafeFunction* ts_fn = | |
+ new v8impl::ThreadSafeFunction(v8_func, | |
+ v8_resource, | |
+ v8_name, | |
+ initial_thread_count, | |
+ context, | |
+ max_queue_size, | |
+ env, | |
+ thread_finalize_data, | |
+ thread_finalize_cb, | |
+ call_js_cb); | |
if (ts_fn == nullptr) { | |
status = napi_generic_failure; | |
@@ -4081,45 +4085,46 @@ napi_create_threadsafe_function(napi_env env, | |
return napi_set_last_error(env, status); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_get_threadsafe_function_context(napi_threadsafe_function func, | |
void** result) { | |
CHECK(func != nullptr); | |
CHECK(result != nullptr); | |
- *result = reinterpret_cast<TsFn*>(func)->Context(); | |
+ *result = reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Context(); | |
return napi_ok; | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_call_threadsafe_function(napi_threadsafe_function func, | |
void* data, | |
napi_threadsafe_function_call_mode is_blocking) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Push(data, | |
+ is_blocking); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_acquire_threadsafe_function(napi_threadsafe_function func) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Acquire(); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Acquire(); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_release_threadsafe_function(napi_threadsafe_function func, | |
napi_threadsafe_function_release_mode mode) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Release(mode); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Release(mode); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Unref(); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Unref(); | |
} | |
-NAPI_EXTERN napi_status | |
+napi_status | |
napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { | |
CHECK(func != nullptr); | |
- return reinterpret_cast<TsFn*>(func)->Ref(); | |
+ return reinterpret_cast<v8impl::ThreadSafeFunction*>(func)->Ref(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment