Skip to content

Instantly share code, notes, and snippets.

@matzoe
Last active March 18, 2019 11:09
Show Gist options
  • Save matzoe/11082417 to your computer and use it in GitHub Desktop.
Save matzoe/11082417 to your computer and use it in GitHub Desktop.
Node.js callback cross threads
// http://nikhilm.github.io/uvbook/threads.html
#include <string>
#include <map>
#include <node.h>
#include <v8.h>
#include <uv.h>
#include <sys/syscall.h>
#include <stdlib.h>
static int thread_id() {
#ifdef __APPLE__
return syscall(SYS_thread_selfid);
#elif defined (_POSIX_SOURCE)
return syscall(__NR_gettid);
#else
return -1;
#endif
}
class AsyncFunc {
struct payload {
v8::Persistent<v8::Function>* func;
const char* data;
};
public:
AsyncFunc(v8::Persistent<v8::Function>& function) : function(function) {
handle = (uv_async_t*)malloc(sizeof(uv_async_t));
uv_async_init(uv_default_loop(), handle, AsyncFunc::doCallback);
};
~AsyncFunc() {
uv_close((uv_handle_t*)handle, close_cb);
};
void notify(const std::string& s) {
notify(s.c_str());
};
void notify(const char* s) {
struct payload* p = new payload();
p->func = &function;
p->data = s;
handle->data = (void *)p;
uv_async_send(handle);
};
private:
uv_async_t* handle;
v8::Persistent<v8::Function> function;
static void close_cb (uv_handle_t* handle) {
free(handle);
};
static void doCallback(uv_async_t* handle, int status) {
v8::HandleScope scope;
const unsigned argc = 1;
struct payload* p = (struct payload*)handle->data;
v8::Handle<v8::Value> argv[argc] = {
v8::Local<v8::Value>::New(v8::String::New(p->data))
};
v8::TryCatch try_catch;
fprintf(stderr, "receiving message (thread::%d) ->\n", thread_id());
(*p->func)->Call(v8::Context::GetCurrent()->Global(), argc, argv);
delete p;
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
};
};
std::map<std::string, AsyncFunc*> pool;
v8::Handle<v8::Value> On(const v8::Arguments& args) {
v8::HandleScope scope;
if (args.Length() < 2 || !args[0]->IsString() || !args[1]->IsFunction()) {
return v8::ThrowException(v8::Exception::TypeError(v8::String::New("Wrong arguments")));
}
v8::String::Utf8Value param1(args[0]->ToString());
std::string key = std::string(*param1);
v8::Persistent<v8::Function> cb = v8::Persistent<v8::Function>::New(v8::Local<v8::Function>::Cast(args[1]));
AsyncFunc* af = new AsyncFunc(cb);
pool[key] = af;
return scope.Close(v8::Undefined());
}
void notify(void *arg) {
std::map<std::string, AsyncFunc*>::iterator it;
while (1) {
sleep(1);
fprintf(stderr, "\t\t\t\t\t\t<- sending message (thread::%d)\n", thread_id());
for(it = pool.begin(); it != pool.end(); it++) {
it->second->notify("WHAT THE FUCK!");
}
}
fprintf(stderr, "Hare done running!\n");
}
v8::Handle<v8::Value> Start(const v8::Arguments& args) {
v8::HandleScope scope;
uv_thread_t id;
uv_thread_create(&id, notify, NULL);
return scope.Close(v8::Undefined());
}
void RegisterModule(v8::Handle<v8::Object> target) {
target->Set(v8::String::NewSymbol("on"),
v8::FunctionTemplate::New(On)->GetFunction());
target->Set(v8::String::NewSymbol("start"),
v8::FunctionTemplate::New(Start)->GetFunction());
}
NODE_MODULE(addon, RegisterModule);
{
"targets": [
{
"target_name": "addon",
"sources": [ "addon.cc" ]
}
]
}
var addon = require('./build/Release/addon');
setInterval(function () {
console.log('Awaken!', new Date());
}, 618);
addon.on('channel', function(msg) {
console.log('channel message:', msg);
});
addon.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment