Created
August 18, 2013 19:06
-
-
Save richardhundt/6263372 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "ray.h" | |
static ray_fiber_t* RAY_MAIN; | |
static uv_async_t RAY_PUMP; | |
void ray_pump(void) { | |
uv_async_send(&RAY_PUMP); | |
} | |
ray_fiber_t* ray_current(lua_State* L) { | |
(void)L; | |
return (ray_fiber_t*)uv_default_loop()->data; | |
} | |
int ray_suspend(ray_fiber_t* fiber) { | |
TRACE("suspending: %p\n", fiber); | |
return lua_yield(fiber->L, lua_gettop(fiber->L)); | |
} | |
void ray_sched_cb(uv_idle_t* idle, int status) { | |
ray_fiber_t* self = container_of(idle, ray_fiber_t, hook); | |
uv_idle_stop(&self->hook); | |
ray_resume(self, LUA_MULTRET); | |
} | |
int ray_ready(ray_fiber_t* fiber) { | |
uv_idle_start(&fiber->hook, ray_sched_cb); | |
return 1; | |
} | |
int ray_finish(ray_fiber_t* self) { | |
TRACE("finish: %p\n", self); | |
while (!ngx_queue_empty(&self->fiber_queue)) { | |
int i, n; | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
n = lua_gettop(self->L); | |
for (i = 1; i <= n; i++) { | |
lua_pushvalue(self->L, i); | |
} | |
lua_xmove(self->L, f->L, n); | |
ray_resume(f, n); | |
} | |
uv_idle_stop(&self->hook); | |
if (!ngx_queue_empty(&self->queue)) { | |
ngx_queue_remove(&self->queue); | |
ngx_queue_init(&self->queue); | |
} | |
lua_settop(self->L, 0); | |
luaL_unref(self->L, LUA_REGISTRYINDEX, self->L_ref); | |
luaL_unref(self->L, LUA_REGISTRYINDEX, self->ref); | |
return 1; | |
} | |
int ray_resume(ray_fiber_t* fiber, int narg) { | |
TRACE("resuming: %p\n", fiber); | |
if (fiber->flags & RAY_STARTED && lua_status(fiber->L) != LUA_YIELD) { | |
TRACE("refusing to resume dead coroutine\n"); | |
return -1; | |
} | |
if (fiber == RAY_MAIN) { | |
TRACE("refusing to resume RAY_MAIN\n"); | |
return -1; | |
} | |
if (narg == LUA_MULTRET) { | |
narg = lua_gettop(fiber->L); | |
} | |
if (!(fiber->flags & RAY_STARTED)) { | |
fiber->flags |= RAY_STARTED; | |
--narg; | |
} | |
uv_loop_t* loop = uv_default_loop(); | |
void* prev = loop->data; | |
loop->data = fiber; | |
assert(fiber != RAY_MAIN); | |
int rc = lua_resume(fiber->L, narg); | |
loop->data = prev; | |
switch (rc) { | |
case LUA_YIELD: { | |
break; | |
} | |
case 0: { | |
ray_finish(fiber); | |
break; | |
} | |
default: { | |
return luaL_error(RAY_MAIN->L, lua_tostring(fiber->L, -1)); | |
} | |
} | |
return 1; | |
} | |
int ray_push_error(lua_State* L, uv_errno_t err) { | |
lua_settop(L, 0); | |
lua_pushnil(L); | |
lua_pushstring(L, uv_err_name(err)); | |
return 2; | |
} | |
int ray_run(lua_State* L) { | |
uv_run(uv_default_loop(), UV_RUN_DEFAULT); | |
return 0; | |
} | |
void ray_buf_need(ray_buf_t* buf, size_t len) { | |
size_t size = buf->size; | |
size_t need = buf->offs + len; | |
if (need > buf->size) { | |
while (size < need) size *= 2; | |
buf->base = (char*)realloc(buf->base, size); | |
buf->size = size; | |
} | |
} | |
void ray_buf_init(ray_buf_t* buf) { | |
buf->base = calloc(1, RAY_BUF_SIZE); | |
buf->size = RAY_BUF_SIZE; | |
buf->offs = 0; | |
} | |
void ray_buf_write(ray_buf_t* buf, char* data, size_t len) { | |
ray_buf_need(buf, len); | |
memcpy(buf->base + buf->offs, data, len); | |
buf->offs += len; | |
} | |
const char* ray_buf_read(ray_buf_t* buf, size_t* len) { | |
*len = buf->offs; | |
buf->offs = 0; | |
return buf->base; | |
} | |
void ray_buf_clear(ray_buf_t* buf) { | |
memset(buf->base, 0, buf->size); | |
buf->offs = 0; | |
} | |
int ray_fiber_new(lua_State* L) { | |
ray_fiber_t* self = lua_newuserdata(L, sizeof(ray_fiber_t)); | |
luaL_getmetatable(L, "ray.fiber"); | |
lua_setmetatable(L, -2); | |
lua_insert(L, 1); | |
lua_pushvalue(L, 1); | |
self->ref = luaL_ref(L, LUA_REGISTRYINDEX); | |
self->L = lua_newthread(L); | |
self->L_ref = luaL_ref(L, LUA_REGISTRYINDEX); | |
self->flags = 0; | |
ngx_queue_init(&self->queue); | |
ngx_queue_init(&self->fiber_queue); | |
uv_idle_init(uv_default_loop(), &self->hook); | |
lua_xmove(L, self->L, lua_gettop(L) - 1); | |
ray_ready(self); | |
return 1; | |
} | |
int ray_fiber_ready(lua_State* L) { | |
ray_fiber_t* self = (ray_fiber_t*)lua_touserdata(L, 1); | |
ray_ready(self); | |
return 1; | |
} | |
int ray_fiber_join(lua_State* L) { | |
ray_fiber_t* self = (ray_fiber_t*)lua_touserdata(L, 1); | |
ray_fiber_t* curr = ray_current(L); | |
assert(ngx_queue_empty(&curr->queue)); | |
ngx_queue_insert_tail(&self->fiber_queue, &curr->queue); | |
ray_ready(self); | |
return ray_suspend(curr); | |
} | |
ray_agent_t* ray_agent_new(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_newuserdata(L, sizeof(ray_agent_t)); | |
self->flags = 0; | |
self->count = 0; | |
self->data = NULL; | |
ray_buf_init(&self->buf); | |
ngx_queue_init(&self->fiber_queue); | |
lua_pushvalue(L, 1); | |
self->ref = luaL_ref(L, LUA_REGISTRYINDEX); | |
return self; | |
} | |
/* ========================================================================== */ | |
/* timer */ | |
/* ========================================================================== */ | |
int ray_timer_new(lua_State* L) { | |
ray_agent_t* self = ray_agent_new(L); | |
luaL_getmetatable(L, "ray.timer"); | |
lua_setmetatable(L, -2); | |
int rc = uv_timer_init(uv_default_loop(), &self->h.timer); | |
if (rc) return ray_push_error(L, rc); | |
return 1; | |
} | |
void ray_timer_cb(uv_timer_t* timer, int status) { | |
ray_agent_t* self = container_of(timer, ray_agent_t, h); | |
ngx_queue_t* tail = ngx_queue_last(&self->fiber_queue); | |
while (!ngx_queue_empty(&self->fiber_queue)) { | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
lua_settop(f->L, 0); | |
lua_pushboolean(f->L, 1); | |
ray_resume(f, 1); | |
if (q == tail) break; | |
} | |
} | |
int ray_timer_start(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.timer"); | |
int64_t timeout = luaL_optlong(L, 2, 0L); | |
int64_t repeat = luaL_optlong(L, 3, 0L); | |
int rc = uv_timer_start(&self->h.timer, ray_timer_cb, timeout, repeat); | |
if (rc) return ray_push_error(L, rc); | |
return 1; | |
} | |
int ray_timer_stop(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.timer"); | |
int rc = uv_timer_stop(&self->h.timer); | |
if (rc) return ray_push_error(L, rc); | |
return 1; | |
} | |
int ray_timer_wait(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.timer"); | |
ray_fiber_t* curr = ray_current(L); | |
assert(ngx_queue_empty(&curr->queue)); | |
ngx_queue_insert_tail(&self->fiber_queue, &curr->queue); | |
return ray_suspend(curr); | |
} | |
/* ========================================================================== */ | |
/* stream */ | |
/* ========================================================================== */ | |
void ray_close_cb(uv_handle_t* handle) { | |
ray_agent_t* self = container_of(handle, ray_agent_t, h); | |
TRACE("closed: %p\n", self); | |
ray_fiber_t* curr = (ray_fiber_t*)self->data; | |
if (curr) ray_resume(curr, 0); | |
if (self->ref != LUA_NOREF) { | |
TRACE("UNREF: %i!\n", self->ref); | |
luaL_unref(RAY_MAIN->L, LUA_REGISTRYINDEX, self->ref); | |
self->ref = LUA_NOREF; | |
} | |
} | |
int ray_close(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
if (!uv_is_closing(&self->h.handle)) { | |
ray_fiber_t* curr = ray_current(L); | |
self->data = curr; | |
uv_close(&self->h.handle, ray_close_cb); | |
return ray_suspend(curr); | |
} | |
return 0; | |
} | |
uv_buf_t ray_alloc_cb(uv_handle_t* h, size_t len) { | |
ray_agent_t* self = container_of(h, ray_agent_t, h); | |
if (len > RAY_BUF_SIZE) len = RAY_BUF_SIZE; | |
ray_buf_need(&self->buf, len); | |
return uv_buf_init(self->buf.base + self->buf.offs, len); | |
} | |
void ray_close_null_cb(uv_handle_t* handle) { | |
TRACE("running null close cb - data: %p\n", handle->data); | |
} | |
void ray_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) { | |
ray_agent_t* self = container_of(stream, ray_agent_t, h); | |
uv_read_stop(stream); | |
self->flags &= ~RAY_STARTED; | |
TRACE("nread: %i\n", (int)nread); | |
if (nread >= 0) { | |
assert(!ngx_queue_empty(&self->fiber_queue)); | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
lua_pushlstring(f->L, (const char*)buf.base, nread); | |
ray_buf_clear(&self->buf); | |
ray_resume(f, 1); | |
} | |
else { | |
while (!ngx_queue_empty(&self->fiber_queue)) { | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
ray_resume(f, ray_push_error(f->L, nread)); | |
} | |
if (!uv_is_closing((uv_handle_t*)stream)) { | |
uv_close((uv_handle_t*)stream, ray_close_null_cb); | |
} | |
} | |
} | |
int ray_read(lua_State* L) { | |
TRACE("reading\n"); | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
ray_fiber_t* curr = ray_current(L); | |
assert((self->flags & RAY_STARTED) == 0); | |
self->flags |= RAY_STARTED; | |
uv_read_start(&self->h.stream, ray_alloc_cb, ray_read_cb); | |
assert(ngx_queue_empty(&curr->queue)); | |
ngx_queue_insert_tail(&self->fiber_queue, &curr->queue); | |
return ray_suspend(curr); | |
} | |
void ray_write_cb(uv_write_t* req, int status) { | |
ray_fiber_t* curr = container_of(req, ray_fiber_t, r); | |
TRACE("finished writing - curr: %p\n", curr); | |
ray_resume(curr, 1); | |
} | |
int ray_write(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
ray_fiber_t* curr = ray_current(L); | |
size_t len; | |
const char* base = lua_tolstring(L, 2, &len); | |
assert((self->flags & RAY_STARTED) == 0); | |
uv_buf_t buf = uv_buf_init((char*)base, len); | |
TRACE("writing - curr: %p\n", curr); | |
uv_write(&curr->r.write, &self->h.stream, &buf, 1, ray_write_cb); | |
lua_settop(L, 1); | |
return ray_suspend(curr); | |
} | |
void ray_shutdown_cb(uv_shutdown_t* req, int status) { | |
ray_fiber_t* curr = container_of(req, ray_fiber_t, r); | |
TRACE("shutdown: %p\n", curr); | |
lua_settop(curr->L, 0); | |
lua_pushboolean(curr->L, 1); | |
ray_resume(curr, 1); | |
} | |
int ray_shutdown(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
ray_fiber_t* curr = ray_current(L); | |
TRACE("shutdown: self: %p, curr: %p\n", self, curr); | |
uv_shutdown_t* req = &curr->r.shutdown; | |
int rc = uv_shutdown(req, &self->h.stream, ray_shutdown_cb); | |
if (rc) return ray_push_error(L, rc); | |
if (self->flags & RAY_STARTED) { | |
self->flags &= ~RAY_STARTED; | |
uv_read_stop(&self->h.stream); | |
} | |
return ray_suspend(curr); | |
} | |
void ray_listen_cb(uv_stream_t* server, int status) { | |
ray_agent_t* self = container_of(server, ray_agent_t, h); | |
TRACE("agent: %p\n", self); | |
if (!ngx_queue_empty(&self->fiber_queue)) { | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
ray_agent_t* conn = (ray_agent_t*)lua_touserdata(f->L, 2); | |
TRACE("new connection: %p\n", conn); | |
int rc = uv_accept(server, &conn->h.stream); | |
if (rc) { | |
ray_resume(f, ray_push_error(f->L, rc)); | |
return; | |
} | |
ray_resume(f, 1); | |
} | |
else { | |
TRACE("count++\n"); | |
self->count++; | |
} | |
} | |
int ray_listen(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
int backlog = luaL_optint(L, 2, 128); | |
self->count = 0; | |
int rc = uv_listen(&self->h.stream, backlog, ray_listen_cb); | |
if (rc) return ray_push_error(L, rc); | |
return 1; | |
} | |
int ray_accept(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
ray_agent_t* conn = (ray_agent_t*)lua_touserdata(L, 2); | |
TRACE("accepting - count: %i\n", self->count); | |
if (self->count > 0) { | |
self->count--; | |
int rc = uv_accept(&self->h.stream, &conn->h.stream); | |
if (rc) return ray_push_error(L, rc); | |
return 1; | |
} | |
else { | |
ray_fiber_t* curr = ray_current(L); | |
assert(curr != RAY_MAIN); | |
assert(ngx_queue_empty(&curr->queue)); | |
ngx_queue_insert_tail(&self->fiber_queue, &curr->queue); | |
return ray_suspend(curr); | |
} | |
} | |
void ray_connect_cb(uv_connect_t* req, int status) { | |
ray_fiber_t* curr = container_of(req, ray_fiber_t, r); | |
if (status) { | |
ray_resume(curr, ray_push_error(curr->L, status)); | |
} | |
else { | |
lua_pushboolean(curr->L, 1); | |
ray_resume(curr, 1); | |
} | |
} | |
/* ========================================================================== */ | |
/* pipe */ | |
/* ========================================================================== */ | |
int ray_pipe_new(lua_State* L) { | |
int ipc = luaL_optint(L, 1, 0); | |
ray_agent_t* self = ray_agent_new(L); | |
luaL_getmetatable(L, "ray.pipe"); | |
lua_setmetatable(L, -2); | |
uv_pipe_init(uv_default_loop(), &self->h.pipe, ipc); | |
return 1; | |
} | |
int ray_pipe_accept(lua_State* L) { | |
lua_settop(L, 1); | |
ray_pipe_new(L); | |
return ray_accept(L); | |
} | |
int ray_pipe_bind(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
const char* name = luaL_checkstring(L, 2); | |
int rc = uv_pipe_bind(&self->h.pipe, name); | |
if (rc) return ray_push_error(L, rc); | |
lua_settop(L, 1); | |
return 1; | |
} | |
int ray_pipe_open(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
uv_file fh = luaL_checkint(L, 2); | |
int rc = uv_pipe_open(&self->h.pipe, fh); | |
if (rc) return ray_push_error(L, rc); | |
lua_settop(L, 1); | |
return 1; | |
} | |
int ray_pipe_connect(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
ray_fiber_t* curr = ray_current(L); | |
const char* name = luaL_checkstring(L, 2); | |
uv_pipe_connect(&curr->r.connect, &self->h.pipe, name, ray_connect_cb); | |
assert(ngx_queue_empty(&curr->queue)); | |
ngx_queue_insert_tail(&self->fiber_queue, &curr->queue); | |
return ray_suspend(curr); | |
} | |
/* ========================================================================== */ | |
/* TCP */ | |
/* ========================================================================== */ | |
int ray_tcp_new(lua_State* L) { | |
ray_agent_t* self = ray_agent_new(L); | |
luaL_getmetatable(L, "ray.tcp"); | |
lua_setmetatable(L, -2); | |
uv_tcp_init(uv_default_loop(), &self->h.tcp); | |
return 1; | |
} | |
int ray_tcp_accept(lua_State* L) { | |
lua_settop(L, 1); | |
ray_tcp_new(L); | |
return ray_accept(L); | |
} | |
int ray_tcp_connect(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
ray_fiber_t* curr = ray_current(L); | |
struct sockaddr_in addr; | |
const char* host; | |
int port; | |
host = luaL_checkstring(L, 2); | |
port = luaL_checkint(L, 3); | |
addr = uv_ip4_addr(host, port); | |
int rc = uv_tcp_connect(&curr->r.connect, &self->h.tcp, addr, ray_connect_cb); | |
if (rc) return ray_push_error(L, rc); | |
return ray_suspend(curr); | |
} | |
int ray_tcp_bind(lua_State* L) { | |
ray_agent_t *self = (ray_agent_t*)lua_touserdata(L, 1); | |
struct sockaddr_in addr; | |
const char* host; | |
int port; | |
host = luaL_checkstring(L, 2); | |
port = luaL_checkint(L, 3); | |
addr = uv_ip4_addr(host, port); | |
int rc = uv_tcp_bind(&self->h.tcp, addr); | |
if (rc) return ray_push_error(L, rc); | |
lua_settop(L, 1); | |
return 1; | |
} | |
int ray_tcp_nodelay(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.tcp"); | |
luaL_checktype(L, 2, LUA_TBOOLEAN); | |
int enable = lua_toboolean(L, 2); | |
lua_settop(L, 2); | |
int rc = uv_tcp_nodelay(&self->h.tcp, enable); | |
if (rc) return ray_push_error(L, rc); | |
lua_settop(L, 1); | |
return 1; | |
} | |
int ray_tcp_keepalive(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.tcp"); | |
luaL_checktype(L, 2, LUA_TBOOLEAN); | |
int enable = lua_toboolean(L, 2); | |
unsigned int delay = 0; | |
if (enable) delay = luaL_checkint(L, 3); | |
int rc = uv_tcp_keepalive(&self->h.tcp, enable, delay); | |
if (rc) return ray_push_error(L, rc); | |
lua_settop(L, 1); | |
return 1; | |
} | |
int ray_tcp_getsockname(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.tcp"); | |
int port = 0; | |
char ip[INET6_ADDRSTRLEN]; | |
int family; | |
struct sockaddr_storage addr; | |
int len = sizeof(addr); | |
int rc = uv_tcp_getsockname(&self->h.tcp, (struct sockaddr*)&addr, &len); | |
if (rc) return ray_push_error(L, rc); | |
family = addr.ss_family; | |
if (family == AF_INET) { | |
struct sockaddr_in* addrin = (struct sockaddr_in*)&addr; | |
uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN); | |
port = ntohs(addrin->sin_port); | |
} | |
else if (family == AF_INET6) { | |
struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&addr; | |
uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN); | |
port = ntohs(addrin6->sin6_port); | |
} | |
lua_newtable(L); | |
lua_pushnumber(L, port); | |
lua_setfield(L, -2, "port"); | |
lua_pushnumber(L, family); | |
lua_setfield(L, -2, "family"); | |
lua_pushstring(L, ip); | |
lua_setfield(L, -2, "address"); | |
return 1; | |
} | |
int ray_tcp_getpeername(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.tcp"); | |
int port = 0; | |
char ip[INET6_ADDRSTRLEN]; | |
int family; | |
struct sockaddr_storage addr; | |
int len = sizeof(addr); | |
int rc = uv_tcp_getpeername(&self->h.tcp, (struct sockaddr*)&addr, &len); | |
if (rc) return ray_push_error(L, rc); | |
family = addr.ss_family; | |
if (family == AF_INET) { | |
struct sockaddr_in* addrin = (struct sockaddr_in*)&addr; | |
uv_inet_ntop(AF_INET, &(addrin->sin_addr), ip, INET6_ADDRSTRLEN); | |
port = ntohs(addrin->sin_port); | |
} | |
else if (family == AF_INET6) { | |
struct sockaddr_in6* addrin6 = (struct sockaddr_in6*)&addr; | |
uv_inet_ntop(AF_INET6, &(addrin6->sin6_addr), ip, INET6_ADDRSTRLEN); | |
port = ntohs(addrin6->sin6_port); | |
} | |
lua_newtable(L); | |
lua_pushnumber(L, port); | |
lua_setfield(L, -2, "port"); | |
lua_pushnumber(L, family); | |
lua_setfield(L, -2, "family"); | |
lua_pushstring(L, ip); | |
lua_setfield(L, -2, "address"); | |
return 1; | |
} | |
/* ========================================================================== */ | |
/* UDP */ | |
/* ========================================================================== */ | |
int ray_udp_new(lua_State* L) { | |
ray_agent_t* self = ray_agent_new(L); | |
luaL_getmetatable(L, "ray.udp"); | |
lua_setmetatable(L, -2); | |
uv_udp_init(uv_default_loop(), &self->h.udp); | |
return 1; | |
} | |
int ray_udp_bind(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.udp"); | |
const char* host = luaL_checkstring(L, 2); | |
int port = luaL_checkint(L, 3); | |
int flags = 0; | |
struct sockaddr_in address = uv_ip4_addr(host, port); | |
int rc = uv_udp_bind(&self->h.udp, address, flags); | |
if (rc) return ray_push_error(L, rc); | |
lua_settop(L, 1); | |
return 1; | |
} | |
void ray_udp_send_cb(uv_udp_send_t* req, int status) { | |
ray_fiber_t* curr = container_of(req, ray_fiber_t, r); | |
if (status < 0) { | |
ray_resume(curr, ray_push_error(curr->L, status)); | |
} | |
else { | |
ray_resume(curr, 0); | |
} | |
} | |
int ray_udp_send(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.udp"); | |
ray_fiber_t* curr = ray_current(L); | |
size_t len; | |
const char* host = luaL_checkstring(L, 2); | |
int port = luaL_checkint(L, 3); | |
const char* mesg = luaL_checklstring(L, 4, &len); | |
uv_buf_t buf = uv_buf_init((char*)mesg, len); | |
struct sockaddr_in addr = uv_ip4_addr(host, port); | |
int rc = uv_udp_send(&curr->r.udp_send, &self->h.udp, &buf, 1, addr, ray_udp_send_cb); | |
if (rc) return ray_push_error(L, rc); | |
return ray_suspend(curr); | |
} | |
void ray_udp_recv_cb(uv_udp_t* handle, ssize_t nread, uv_buf_t buf, | |
struct sockaddr* peer, unsigned flags) { | |
ray_agent_t* self = container_of(handle, ray_agent_t, h); | |
if (nread == 0) return; | |
char host[INET6_ADDRSTRLEN]; | |
int port = 0; | |
TRACE("here 1\n"); | |
if (nread < 0) { | |
TRACE("here 2\n"); | |
if (self->flags & RAY_STARTED) { | |
self->flags &= ~RAY_STARTED; | |
uv_udp_recv_stop(&self->h.udp); | |
} | |
TRACE("here 3\n"); | |
while (!ngx_queue_empty(&self->fiber_queue)) { | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
TRACE("here 4\n"); | |
ray_resume(f, ray_push_error(f->L, nread)); | |
} | |
return; | |
} | |
TRACE("here 5\n"); | |
if (peer->sa_family == PF_INET) { | |
struct sockaddr_in* addr = (struct sockaddr_in*)peer; | |
uv_ip4_name(addr, host, INET6_ADDRSTRLEN); | |
port = addr->sin_port; | |
} | |
else if (peer->sa_family == PF_INET6) { | |
struct sockaddr_in6* addr = (struct sockaddr_in6*)peer; | |
uv_ip6_name(addr, host, INET6_ADDRSTRLEN); | |
port = addr->sin6_port; | |
} | |
TRACE("here 6\n"); | |
if (!ngx_queue_empty(&self->fiber_queue)) { | |
ngx_queue_t* q = ngx_queue_head(&self->fiber_queue); | |
ray_fiber_t* f = ngx_queue_data(q, ray_fiber_t, queue); | |
ngx_queue_remove(q); | |
ngx_queue_init(q); | |
lua_checkstack(f->L, 3); | |
lua_pushlstring(f->L, (char*)buf.base, nread); | |
lua_pushstring(f->L, host); | |
lua_pushinteger(f->L, port); | |
ray_buf_clear(&self->buf); | |
ray_resume(f, 3); | |
} | |
else { | |
TRACE("here 7: size: %u, offs: %u\n", (uint32_t)self->buf.size, (uint32_t)self->buf.offs); | |
memcpy(self->buf.base, (char*)(void*)&nread, sizeof(nread)); | |
ray_buf_write(&self->buf, host, strlen(host) + 1); | |
ray_buf_write(&self->buf, (char*)(void*)&port, sizeof(port)); | |
self->count++; | |
} | |
} | |
uv_buf_t ray_udp_alloc_cb(uv_handle_t* handle, size_t len) { | |
ray_agent_t* self = container_of(handle, ray_agent_t, h); | |
ray_buf_need(&self->buf, sizeof(ssize_t) + len); | |
self->buf.offs += sizeof(ssize_t); | |
return uv_buf_init(self->buf.base + sizeof(ssize_t), len); | |
} | |
int ray_udp_recv(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.udp"); | |
ray_fiber_t* curr = ray_current(L); | |
if (!(self->flags & RAY_STARTED)) { | |
int rc = uv_udp_recv_start(&self->h.udp, ray_udp_alloc_cb, ray_udp_recv_cb); | |
if (rc) return ray_push_error(L, rc); | |
self->flags |= RAY_STARTED; | |
} | |
if (self->count > 0) { | |
ssize_t slen; | |
int port; | |
size_t ulen; | |
const char* mesg = ray_buf_read(&self->buf, &ulen); | |
memcpy(&slen, mesg, sizeof(ssize_t)); | |
mesg += sizeof(ssize_t); | |
lua_pushlstring(L, mesg, slen); | |
mesg += slen; | |
lua_pushstring(L, mesg); | |
mesg += strlen(mesg) + 1; | |
memcpy(&port, mesg, sizeof(int)); | |
lua_pushinteger(L, port); | |
ray_buf_clear(&self->buf); | |
self->count--; | |
return 3; | |
} | |
assert(ngx_queue_empty(&curr->queue)); | |
ngx_queue_insert_tail(&self->fiber_queue, &curr->queue); | |
return ray_suspend(curr); | |
} | |
static const char* RAY_UDP_OPTS[] = { "join", "leave", NULL }; | |
int ray_udp_membership(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.udp"); | |
const char* iaddr = luaL_checkstring(L, 3); | |
const char* maddr = luaL_checkstring(L, 2); | |
int option = luaL_checkoption(L, 4, NULL, RAY_UDP_OPTS); | |
uv_membership membership = option ? UV_LEAVE_GROUP : UV_JOIN_GROUP; | |
int rc = uv_udp_set_membership(&self->h.udp, maddr, iaddr, membership); | |
if (rc) return ray_push_error(L, rc); | |
return 1; | |
} | |
/* ========================================================================== */ | |
/* file system */ | |
/* ========================================================================== */ | |
/* shamelessly stolen from luvit - bugs are my own */ | |
static int string_to_flags(lua_State* L, const char* str) { | |
if (strcmp(str, "r") == 0) | |
return O_RDONLY; | |
if (strcmp(str, "r+") == 0) | |
return O_RDWR; | |
if (strcmp(str, "w") == 0) | |
return O_CREAT | O_TRUNC | O_WRONLY; | |
if (strcmp(str, "w+") == 0) | |
return O_CREAT | O_TRUNC | O_RDWR; | |
if (strcmp(str, "a") == 0) | |
return O_APPEND | O_CREAT | O_WRONLY; | |
if (strcmp(str, "a+") == 0) | |
return O_APPEND | O_CREAT | O_RDWR; | |
return luaL_error(L, "Unknown file open flag: '%s'", str); | |
} | |
/* shamelessly stolen from luvit - bugs are my own */ | |
static void push_stats_table(lua_State* L, struct stat* s) { | |
lua_newtable(L); | |
lua_pushinteger(L, s->st_dev); | |
lua_setfield(L, -2, "dev"); | |
lua_pushinteger(L, s->st_ino); | |
lua_setfield(L, -2, "ino"); | |
lua_pushinteger(L, s->st_mode); | |
lua_setfield(L, -2, "mode"); | |
lua_pushinteger(L, s->st_nlink); | |
lua_setfield(L, -2, "nlink"); | |
lua_pushinteger(L, s->st_uid); | |
lua_setfield(L, -2, "uid"); | |
lua_pushinteger(L, s->st_gid); | |
lua_setfield(L, -2, "gid"); | |
lua_pushinteger(L, s->st_rdev); | |
lua_setfield(L, -2, "rdev"); | |
lua_pushinteger(L, s->st_size); | |
lua_setfield(L, -2, "size"); | |
#ifdef __POSIX__ | |
lua_pushinteger(L, s->st_blksize); | |
lua_setfield(L, -2, "blksize"); | |
lua_pushinteger(L, s->st_blocks); | |
lua_setfield(L, -2, "blocks"); | |
#endif | |
lua_pushinteger(L, s->st_atime); | |
lua_setfield(L, -2, "atime"); | |
lua_pushinteger(L, s->st_mtime); | |
lua_setfield(L, -2, "mtime"); | |
lua_pushinteger(L, s->st_ctime); | |
lua_setfield(L, -2, "ctime"); | |
#ifndef _WIN32 | |
lua_pushboolean(L, S_ISREG(s->st_mode)); | |
lua_setfield(L, -2, "is_file"); | |
lua_pushboolean(L, S_ISDIR(s->st_mode)); | |
lua_setfield(L, -2, "is_directory"); | |
lua_pushboolean(L, S_ISCHR(s->st_mode)); | |
lua_setfield(L, -2, "is_character_device"); | |
lua_pushboolean(L, S_ISBLK(s->st_mode)); | |
lua_setfield(L, -2, "is_block_device"); | |
lua_pushboolean(L, S_ISFIFO(s->st_mode)); | |
lua_setfield(L, -2, "is_fifo"); | |
lua_pushboolean(L, S_ISLNK(s->st_mode)); | |
lua_setfield(L, -2, "is_symbolic_link"); | |
lua_pushboolean(L, S_ISSOCK(s->st_mode)); | |
lua_setfield(L, -2, "is_socket"); | |
#endif | |
} | |
/* mostly stolen from luvit - bugs are my own */ | |
static void fs_result(lua_State* L, uv_fs_t* req) { | |
if (req->result < 0) { | |
lua_pushnil(L); | |
lua_pushinteger(L, (uv_errno_t)req->result); | |
} | |
else { | |
switch (req->fs_type) { | |
case UV_FS_RENAME: | |
case UV_FS_UNLINK: | |
case UV_FS_RMDIR: | |
case UV_FS_MKDIR: | |
case UV_FS_FSYNC: | |
case UV_FS_FTRUNCATE: | |
case UV_FS_FDATASYNC: | |
case UV_FS_LINK: | |
case UV_FS_SYMLINK: | |
case UV_FS_CHMOD: | |
case UV_FS_FCHMOD: | |
case UV_FS_CHOWN: | |
case UV_FS_FCHOWN: | |
case UV_FS_UTIME: | |
case UV_FS_FUTIME: | |
case UV_FS_CLOSE: | |
lua_pushinteger(L, req->result); | |
break; | |
case UV_FS_OPEN: { | |
ray_agent_t* self = ray_agent_new(L); | |
luaL_getmetatable(L, "ray.file"); | |
lua_setmetatable(L, -2); | |
self->h.file = req->result; | |
break; | |
} | |
case UV_FS_READ: { | |
lua_pushinteger(L, req->result); | |
lua_pushlstring(L, (const char*)req->data, req->result); | |
free(req->data); | |
req->data = NULL; | |
break; | |
} | |
case UV_FS_WRITE: | |
lua_pushinteger(L, req->result); | |
break; | |
case UV_FS_READLINK: | |
lua_pushstring(L, (char*)req->ptr); | |
break; | |
case UV_FS_READDIR: { | |
int i; | |
char* namep = (char*)req->ptr; | |
int count = req->result; | |
lua_newtable(L); | |
for (i = 1; i <= count; i++) { | |
lua_pushstring(L, namep); | |
lua_rawseti(L, -2, i); | |
namep += strlen(namep) + 1; /* +1 for '\0' */ | |
} | |
break; | |
} | |
case UV_FS_STAT: | |
case UV_FS_LSTAT: | |
case UV_FS_FSTAT: | |
push_stats_table(L, (struct stat*)req->ptr); | |
break; | |
default: | |
luaL_error(L, "Unhandled fs_type"); | |
} | |
} | |
uv_fs_req_cleanup(req); | |
} | |
void ray_fs_cb(uv_fs_t* req) { | |
ray_fiber_t* curr = container_of(req, ray_fiber_t, r); | |
fs_result(curr->L, req); | |
ray_resume(curr, LUA_MULTRET); | |
} | |
#define RAY_FS_CALL(L, func, misc, ...) do { \ | |
ray_fiber_t* curr = ray_current(L); \ | |
uv_loop_t* loop = uv_default_loop(); \ | |
uv_fs_t* req = &curr->r.fs; \ | |
req->data = misc; \ | |
uv_fs_cb cb = (curr == RAY_MAIN) ? NULL : ray_fs_cb ; \ | |
int rc = uv_fs_##func(loop, req, __VA_ARGS__, cb); \ | |
if (rc) return ray_push_error(L, rc); \ | |
if (cb) return ray_suspend(curr); \ | |
else { \ | |
fs_result(L, req); \ | |
return lua_gettop(L); \ | |
} \ | |
} while(0) | |
int ray_fs_open(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
int flags = string_to_flags(L, luaL_checkstring(L, 2)); | |
int mode = strtoul(luaL_checkstring(L, 3), NULL, 8); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, open, NULL, path, flags, mode); | |
} | |
int ray_fs_unlink(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, unlink, NULL, path); | |
} | |
int ray_fs_mkdir(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
int mode = strtoul(luaL_checkstring(L, 2), NULL, 8); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, mkdir, NULL, path, mode); | |
} | |
int ray_fs_rmdir(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, rmdir, NULL, path); | |
} | |
int ray_fs_readdir(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, readdir, NULL, path, 0); | |
} | |
int ray_fs_stat(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, stat, NULL, path); | |
} | |
int ray_fs_rename(lua_State* L) { | |
const char* old_path = luaL_checkstring(L, 1); | |
const char* new_path = luaL_checkstring(L, 2); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, rename, NULL, old_path, new_path); | |
} | |
int ray_fs_sendfile(lua_State* L) { | |
ray_agent_t* o_file = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
ray_agent_t* i_file = (ray_agent_t*)luaL_checkudata(L, 2, "ray.file"); | |
off_t ofs = luaL_checkint(L, 3); | |
size_t len = luaL_checkint(L, 4); | |
lua_settop(L, 2); | |
RAY_FS_CALL(L, sendfile, NULL, o_file->h.file, i_file->h.file, ofs, len); | |
} | |
int ray_fs_chmod(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
int mode = strtoul(luaL_checkstring(L, 2), NULL, 8); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, chmod, NULL, path, mode); | |
} | |
int ray_fs_utime(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
double atime = luaL_checknumber(L, 2); | |
double mtime = luaL_checknumber(L, 3); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, utime, NULL, path, atime, mtime); | |
} | |
int ray_fs_lstat(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, lstat, NULL, path); | |
} | |
int ray_fs_link(lua_State* L) { | |
const char* src_path = luaL_checkstring(L, 1); | |
const char* dst_path = luaL_checkstring(L, 2); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, link, NULL, src_path, dst_path); | |
} | |
int ray_fs_symlink(lua_State* L) { | |
const char* src_path = luaL_checkstring(L, 1); | |
const char* dst_path = luaL_checkstring(L, 2); | |
int flags = string_to_flags(L, luaL_checkstring(L, 3)); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, symlink, NULL, src_path, dst_path, flags); | |
} | |
int ray_fs_readlink(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, readlink, NULL, path); | |
} | |
int ray_fs_chown(lua_State* L) { | |
const char* path = luaL_checkstring(L, 1); | |
int uid = luaL_checkint(L, 2); | |
int gid = luaL_checkint(L, 3); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, chown, NULL, path, uid, gid); | |
} | |
int ray_fs_cwd(lua_State* L) { | |
char buffer[RAY_MAX_PATH]; | |
int err = uv_cwd(buffer, RAY_MAX_PATH); | |
if (err) { | |
return luaL_error(L, uv_strerror(err)); | |
} | |
lua_pushstring(L, buffer); | |
return 1; | |
} | |
int ray_fs_chdir(lua_State* L) { | |
const char* dir = luaL_checkstring(L, 1); | |
int err = uv_chdir(dir); | |
if (err) { | |
return luaL_error(L, uv_strerror(err)); | |
} | |
return 0; | |
} | |
int ray_fs_exepath(lua_State* L) { | |
char buffer[RAY_MAX_PATH]; | |
size_t len = RAY_MAX_PATH; | |
uv_exepath(buffer, &len); | |
lua_pushlstring(L, buffer, len); | |
return 1; | |
} | |
/* file instance methods */ | |
int ray_file_stat(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, fstat, NULL, self->h.file); | |
} | |
int ray_file_sync(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, fsync, NULL, self->h.file); | |
} | |
int ray_file_datasync(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, fdatasync, NULL, self->h.file); | |
} | |
int ray_file_truncate(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
off_t ofs = luaL_checkint(L, 2); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, ftruncate, NULL, self->h.file, ofs); | |
} | |
int ray_file_utime(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
double atime = luaL_checknumber(L, 2); | |
double mtime = luaL_checknumber(L, 3); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, futime, NULL, self->h.file, atime, mtime); | |
} | |
int ray_file_chmod(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
int mode = strtoul(luaL_checkstring(L, 2), NULL, 8); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, fchmod, NULL, self->h.file, mode); | |
} | |
int ray_file_chown(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
int uid = luaL_checkint(L, 2); | |
int gid = luaL_checkint(L, 3); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, fchown, NULL, self->h.file, uid, gid); | |
} | |
int ray_file_read(lua_State *L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
size_t len = luaL_optint(L, 2, RAY_BUF_SIZE); | |
int64_t ofs = luaL_optint(L, 3, -1); | |
void* buf = malloc(len); /* free from ctx->r.fs_req.data in cb */ | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, read, buf, self->h.file, buf, len, ofs); | |
} | |
int ray_file_write(lua_State *L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
size_t len; | |
void* buf = (void*)luaL_checklstring(L, 2, &len); | |
uint64_t ofs = luaL_optint(L, 3, 0); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, write, NULL, self->h.file, buf, len, ofs); | |
} | |
int ray_file_close(lua_State *L) { | |
ray_agent_t* self = (ray_agent_t*)luaL_checkudata(L, 1, "ray.file"); | |
lua_settop(L, 0); | |
RAY_FS_CALL(L, close, NULL, self->h.file); | |
} | |
/* ========================================================================== */ | |
/* utils */ | |
/* ========================================================================== */ | |
void ray_getaddrinfo_cb(uv_getaddrinfo_t* req, int s, struct addrinfo* ai) { | |
ray_fiber_t* self = container_of(req, ray_fiber_t, r); | |
char host[INET6_ADDRSTRLEN]; | |
int port = 0; | |
if (ai->ai_family == PF_INET) { | |
struct sockaddr_in* addr = (struct sockaddr_in*)ai->ai_addr; | |
uv_ip4_name(addr, host, INET6_ADDRSTRLEN); | |
port = addr->sin_port; | |
} | |
else if (ai->ai_family == PF_INET6) { | |
struct sockaddr_in6* addr = (struct sockaddr_in6*)ai->ai_addr; | |
uv_ip6_name(addr, host, INET6_ADDRSTRLEN); | |
port = addr->sin6_port; | |
} | |
lua_settop(self->L, 0); | |
lua_pushstring (self->L, host); | |
lua_pushinteger(self->L, port); | |
uv_freeaddrinfo(ai); | |
ray_resume(self, 2); | |
} | |
int ray_getaddrinfo(lua_State* L) { | |
ray_fiber_t* curr = ray_current(L); | |
uv_getaddrinfo_t* req = &curr->r.getaddrinfo; | |
const char* node = NULL; | |
const char* service = NULL; | |
struct addrinfo hints; | |
if (!lua_isnoneornil(L, 1)) { | |
node = luaL_checkstring(L, 1); | |
} | |
if (!lua_isnoneornil(L, 2)) { | |
service = luaL_checkstring(L, 2); | |
} | |
if (node == NULL && service == NULL) { | |
return luaL_error(L, "getaddrinfo: provide either node or service"); | |
} | |
hints.ai_family = PF_INET; | |
hints.ai_socktype = SOCK_STREAM; | |
hints.ai_protocol = IPPROTO_TCP; | |
hints.ai_flags = 0; | |
if (lua_istable(L, 3)) { | |
lua_getfield(L, 3, "family"); | |
if (!lua_isnil(L, -1)) { | |
const char* s = lua_tostring(L, -1); | |
if (strcmp(s, "INET") == 0) { | |
hints.ai_family = PF_INET; | |
} | |
else if (strcmp(s, "INET6") == 0) { | |
hints.ai_family = PF_INET6; | |
} | |
else { | |
return luaL_error(L, "unsupported family: %s", s); | |
} | |
} | |
lua_pop(L, 1); | |
lua_getfield(L, 3, "socktype"); | |
if (!lua_isnil(L, -1)) { | |
const char* s = lua_tostring(L, -1); | |
if (strcmp(s, "STREAM") == 0) { | |
hints.ai_socktype = SOCK_STREAM; | |
} | |
else if (strcmp(s, "DGRAM")) { | |
hints.ai_socktype = SOCK_DGRAM; | |
} | |
else { | |
return luaL_error(L, "unsupported socktype: %s", s); | |
} | |
} | |
lua_pop(L, 1); | |
lua_getfield(L, 3, "protocol"); | |
if (!lua_isnil(L, -1)) { | |
const char* s = lua_tostring(L, -1); | |
if (strcmp(s, "TCP") == 0) { | |
hints.ai_protocol = IPPROTO_TCP; | |
} | |
else if (strcmp(s, "UDP") == 0) { | |
hints.ai_protocol = IPPROTO_UDP; | |
} | |
else { | |
return luaL_error(L, "unsupported protocol: %s", s); | |
} | |
} | |
lua_pop(L, 1); | |
} | |
uv_loop_t* loop = uv_default_loop(); | |
int rc = uv_getaddrinfo(loop, req, ray_getaddrinfo_cb, node, service, &hints); | |
if (rc) return ray_push_error(L, rc); | |
return ray_suspend(curr); | |
} | |
int ray_mem_free(lua_State* L) { | |
lua_pushinteger(L, uv_get_free_memory()); | |
return 1; | |
} | |
int ray_mem_total(lua_State* L) { | |
lua_pushinteger(L, uv_get_total_memory()); | |
return 1; | |
} | |
int ray_hrtime(lua_State* L) { | |
lua_pushinteger(L, uv_hrtime()); | |
return 1; | |
} | |
int ray_cpu_info(lua_State* L) { | |
int size, i; | |
uv_cpu_info_t* info; | |
int err = uv_cpu_info(&info, &size); | |
lua_settop(L, 0); | |
if (err) { | |
lua_pushboolean(L, 0); | |
luaL_error(L, uv_strerror(err)); | |
return 2; | |
} | |
lua_newtable(L); | |
for (i = 0; i < size; i++) { | |
lua_newtable(L); | |
lua_pushstring(L, info[i].model); | |
lua_setfield(L, -2, "model"); | |
lua_pushinteger(L, (lua_Integer)info[i].speed); | |
lua_setfield(L, -2, "speed"); | |
lua_newtable(L); /* times */ | |
lua_pushinteger(L, (lua_Integer)info[i].cpu_times.user); | |
lua_setfield(L, -2, "user"); | |
lua_pushinteger(L, (lua_Integer)info[i].cpu_times.nice); | |
lua_setfield(L, -2, "nice"); | |
lua_pushinteger(L, (lua_Integer)info[i].cpu_times.sys); | |
lua_setfield(L, -2, "sys"); | |
lua_pushinteger(L, (lua_Integer)info[i].cpu_times.idle); | |
lua_setfield(L, -2, "idle"); | |
lua_pushinteger(L, (lua_Integer)info[i].cpu_times.irq); | |
lua_setfield(L, -2, "irq"); | |
lua_setfield(L, -2, "times"); | |
lua_rawseti(L, 1, i + 1); | |
} | |
uv_free_cpu_info(info, size); | |
return 1; | |
} | |
int ray_interfaces(lua_State* L) { | |
int size, i; | |
char buf[INET6_ADDRSTRLEN]; | |
uv_interface_address_t* info; | |
int err = uv_interface_addresses(&info, &size); | |
lua_settop(L, 0); | |
if (err) { | |
lua_pushboolean(L, 0); | |
luaL_error(L, uv_strerror(err)); | |
return 2; | |
} | |
lua_newtable(L); | |
for (i = 0; i < size; i++) { | |
uv_interface_address_t addr = info[i]; | |
lua_newtable(L); | |
lua_pushstring(L, addr.name); | |
lua_setfield(L, -2, "name"); | |
lua_pushboolean(L, addr.is_internal); | |
lua_setfield(L, -2, "is_internal"); | |
if (addr.address.address4.sin_family == PF_INET) { | |
uv_ip4_name(&addr.address.address4, buf, sizeof(buf)); | |
} | |
else if (addr.address.address4.sin_family == PF_INET6) { | |
uv_ip6_name(&addr.address.address6, buf, sizeof(buf)); | |
} | |
lua_pushstring(L, buf); | |
lua_setfield(L, -2, "address"); | |
lua_rawseti(L, -2, i + 1); | |
} | |
uv_free_interface_addresses(info, size); | |
return 1; | |
} | |
int ray_agent_free(lua_State* L) { | |
ray_agent_t* self = (ray_agent_t*)lua_touserdata(L, 1); | |
TRACE("FREE: %p\n", self); | |
if (self->buf.base) { | |
free(self->buf.base); | |
self->buf.base = NULL; | |
self->buf.size = 0; | |
self->buf.offs = 0; | |
} | |
return 0; | |
} | |
static luaL_Reg ray_funcs[] = { | |
{"fiber", ray_fiber_new}, | |
{"timer", ray_timer_new}, | |
{"pipe", ray_pipe_new}, | |
{"tcp", ray_tcp_new}, | |
{"udp", ray_udp_new}, | |
{"open", ray_fs_open}, | |
{"unlink", ray_fs_unlink}, | |
{"mkdir", ray_fs_mkdir}, | |
{"rmdir", ray_fs_rmdir}, | |
{"readdir", ray_fs_readdir}, | |
{"stat", ray_fs_stat}, | |
{"rename", ray_fs_rename}, | |
{"sendfile", ray_fs_sendfile}, | |
{"chmod", ray_fs_chmod}, | |
{"chown", ray_fs_chown}, | |
{"utime", ray_fs_utime}, | |
{"lstat", ray_fs_lstat}, | |
{"link", ray_fs_link}, | |
{"symlink", ray_fs_symlink}, | |
{"readlink", ray_fs_readlink}, | |
{"cwd", ray_fs_cwd}, | |
{"chdir", ray_fs_chdir}, | |
{"exepath", ray_fs_exepath}, | |
{"cpu_info", ray_cpu_info}, | |
{"mem_free", ray_mem_free}, | |
{"mem_total", ray_mem_total}, | |
{"hrtime", ray_hrtime}, | |
{"interfaces",ray_interfaces}, | |
{"getaddrinfo",ray_getaddrinfo}, | |
{"run", ray_run}, | |
{NULL, NULL} | |
}; | |
static luaL_Reg ray_fiber_meths[] = { | |
{"join", ray_fiber_join}, | |
{"ready", ray_fiber_ready}, | |
{NULL, NULL} | |
}; | |
static luaL_Reg ray_timer_meths[] = { | |
{"close", ray_close}, | |
{"start", ray_timer_start}, | |
{"stop", ray_timer_stop}, | |
{"wait", ray_timer_wait}, | |
{"__gc", ray_agent_free}, | |
{NULL, NULL} | |
}; | |
static luaL_Reg ray_pipe_meths[] = { | |
{"read", ray_read}, | |
{"write", ray_write}, | |
{"close", ray_close}, | |
{"listen", ray_listen}, | |
{"shutdown", ray_shutdown}, | |
{"open", ray_pipe_open}, | |
{"accept", ray_pipe_accept}, | |
{"bind", ray_pipe_bind}, | |
{"connect", ray_pipe_connect}, | |
{"__gc", ray_agent_free}, | |
{NULL, NULL} | |
}; | |
static luaL_Reg ray_tcp_meths[] = { | |
{"read", ray_read}, | |
{"write", ray_write}, | |
{"close", ray_close}, | |
{"listen", ray_listen}, | |
{"shutdown", ray_shutdown}, | |
{"accept", ray_tcp_accept}, | |
{"bind", ray_tcp_bind}, | |
{"connect", ray_tcp_connect}, | |
{"nodelay", ray_tcp_nodelay}, | |
{"keepalive", ray_tcp_keepalive}, | |
{"getpeername", ray_tcp_getpeername}, | |
{"getsockname", ray_tcp_getsockname}, | |
{"__gc", ray_agent_free}, | |
{NULL, NULL} | |
}; | |
static luaL_Reg ray_udp_meths[] = { | |
{"send", ray_udp_send}, | |
{"recv", ray_udp_recv}, | |
{"bind", ray_udp_bind}, | |
{"membership",ray_udp_membership}, | |
{"__gc", ray_agent_free}, | |
{NULL, NULL} | |
}; | |
static luaL_Reg ray_file_meths[] = { | |
{"read", ray_file_read}, | |
{"write", ray_file_write}, | |
{"close", ray_file_close}, | |
{"stat", ray_file_stat}, | |
{"sync", ray_file_sync}, | |
{"utime", ray_file_utime}, | |
{"chmod", ray_file_chmod}, | |
{"chown", ray_file_chown}, | |
{"datasync", ray_file_datasync}, | |
{"truncate", ray_file_truncate}, | |
{NULL, NULL} | |
}; | |
void ray_pump_cb(uv_async_t* async, int status) { | |
(void)async; | |
(void)status; | |
} | |
LUALIB_API int luaopen_ray(lua_State* L) { | |
#ifndef WIN32 | |
signal(SIGPIPE, SIG_IGN); | |
#endif | |
lua_settop(L, 0); | |
luaL_newmetatable(L, "ray.fiber"); | |
luaL_register(L, NULL, ray_fiber_meths); | |
lua_pushvalue(L, -1); | |
lua_setfield(L, -2, "__index"); | |
lua_pop(L, 1); | |
luaL_newmetatable(L, "ray.timer"); | |
luaL_register(L, NULL, ray_timer_meths); | |
lua_pushvalue(L, -1); | |
lua_setfield(L, -2, "__index"); | |
lua_pop(L, 1); | |
luaL_newmetatable(L, "ray.pipe"); | |
luaL_register(L, NULL, ray_pipe_meths); | |
lua_pushvalue(L, -1); | |
lua_setfield(L, -2, "__index"); | |
lua_pop(L, 1); | |
luaL_newmetatable(L, "ray.tcp"); | |
luaL_register(L, NULL, ray_tcp_meths); | |
lua_pushvalue(L, -1); | |
lua_setfield(L, -2, "__index"); | |
lua_pop(L, 1); | |
luaL_newmetatable(L, "ray.udp"); | |
luaL_register(L, NULL, ray_udp_meths); | |
lua_pushvalue(L, -1); | |
lua_setfield(L, -2, "__index"); | |
lua_pop(L, 1); | |
luaL_newmetatable(L, "ray.file"); | |
luaL_register(L, NULL, ray_file_meths); | |
lua_pushvalue(L, -1); | |
lua_setfield(L, -2, "__index"); | |
lua_pop(L, 1); | |
uv_async_init(uv_default_loop(), &RAY_PUMP, ray_pump_cb); | |
uv_unref((uv_handle_t*)&RAY_PUMP); | |
RAY_MAIN = (ray_fiber_t*)malloc(sizeof(ray_fiber_t)); | |
RAY_MAIN->L = L; | |
RAY_MAIN->ref = LUA_NOREF; | |
RAY_MAIN->flags = RAY_STARTED; | |
ngx_queue_init(&RAY_MAIN->queue); | |
ngx_queue_init(&RAY_MAIN->fiber_queue); | |
luaL_register(L, "ray", ray_funcs); | |
return 1; | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdlib.h> | |
#include <stddef.h> | |
#include <string.h> | |
#include <assert.h> | |
#include "lua.h" | |
#include "lualib.h" | |
#include "lauxlib.h" | |
#ifdef WIN32 | |
# define UNUSED /* empty */ | |
# define INLINE __inline | |
#else | |
# define UNUSED __attribute__((unused)) | |
# define INLINE inline | |
#endif | |
#define RAY_DEBUG | |
#define NGX_DEBUG | |
#include "uv/include/uv.h" | |
#include "ngx-queue.h" | |
#ifdef RAY_DEBUG | |
# define TRACE(fmt, ...) do { \ | |
fprintf(stderr, "%s: %d: %s: " fmt, \ | |
__FILE__, __LINE__, __func__, ##__VA_ARGS__); \ | |
} while (0) | |
#else | |
# define TRACE(fmt, ...) ((void)0) | |
#endif /* RAY_DEBUG */ | |
typedef union ray_handle_u { | |
uv_handle_t handle; | |
uv_stream_t stream; | |
uv_tcp_t tcp; | |
uv_pipe_t pipe; | |
uv_prepare_t prepare; | |
uv_check_t check; | |
uv_idle_t idle; | |
uv_async_t async; | |
uv_timer_t timer; | |
uv_fs_event_t fs_event; | |
uv_fs_poll_t fs_poll; | |
uv_poll_t poll; | |
uv_process_t process; | |
uv_tty_t tty; | |
uv_udp_t udp; | |
uv_file file; | |
} ray_handle_t; | |
typedef union ray_req_u { | |
uv_req_t req; | |
uv_write_t write; | |
uv_connect_t connect; | |
uv_shutdown_t shutdown; | |
uv_fs_t fs; | |
uv_work_t work; | |
uv_udp_send_t udp_send; | |
uv_getaddrinfo_t getaddrinfo; | |
} ray_req_t; | |
/* default buffer size for read operations */ | |
#define RAY_BUF_SIZE 4096 | |
/* max path length */ | |
#define RAY_MAX_PATH 1024 | |
#define container_of(ptr, type, member) \ | |
((type*) ((char*)(ptr) - offsetof(type, member))) | |
/* lifted from luasys */ | |
#define ray_boxpointer(L,u) \ | |
(*(void**) (lua_newuserdata(L, sizeof(void*))) = (u)) | |
#define ray_unboxpointer(L,i) \ | |
(*(void**) (lua_touserdata(L, i))) | |
/* lifted from luasys */ | |
#define ray_boxinteger(L,n) \ | |
(*(lua_Integer*) (lua_newuserdata(L, sizeof(lua_Integer))) = (lua_Integer)(n)) | |
#define ray_unboxinteger(L,i) \ | |
(*(lua_Integer*) (lua_touserdata(L, i))) | |
#define ray_absindex(L, i) \ | |
((i) > 0 || (i) <= LUA_REGISTRYINDEX ? (i) : lua_gettop(L) + (i) + 1) | |
#define RAY_STARTED 1 | |
#define RAY_CLOSED 2 | |
typedef struct ray_fiber_s ray_fiber_t; | |
typedef struct ray_agent_s ray_agent_t; | |
typedef struct ray_evt_s ray_evt_t; | |
typedef struct ray_evq_s ray_evq_t; | |
typedef struct ray_buf_s ray_buf_t; | |
struct ray_buf_s { | |
size_t size; | |
size_t offs; | |
char* base; | |
}; | |
int ray_resume (ray_fiber_t*, int); | |
int ray_ready (ray_fiber_t*); | |
int ray_suspend(ray_fiber_t*); | |
ray_fiber_t* ray_current(lua_State*); | |
typedef enum { | |
RAY_UNKNOWN = -1, | |
RAY_CUSTOM, | |
RAY_ERROR, | |
RAY_READ, | |
RAY_WRITE, | |
RAY_CLOSE, | |
RAY_CONNECTION, | |
RAY_TIMER, | |
RAY_IDLE, | |
RAY_CONNECT, | |
RAY_SHUTDOWN, | |
RAY_WORK, | |
RAY_FS_CUSTOM, | |
RAY_FS_ERROR, | |
RAY_FS_OPEN, | |
RAY_FS_CLOSE, | |
RAY_FS_READ, | |
RAY_FS_WRITE, | |
RAY_FS_SENDFILE, | |
RAY_FS_STAT, | |
RAY_FS_LSTAT, | |
RAY_FS_FSTAT, | |
RAY_FS_FTRUNCATE, | |
RAY_FS_UTIME, | |
RAY_FS_FUTIME, | |
RAY_FS_CHMOD, | |
RAY_FS_FCHMOD, | |
RAY_FS_FSYNC, | |
RAY_FS_FDATASYNC, | |
RAY_FS_UNLINK, | |
RAY_FS_RMDIR, | |
RAY_FS_MKDIR, | |
RAY_FS_RENAME, | |
RAY_FS_READDIR, | |
RAY_FS_LINK, | |
RAY_FS_SYMLINK, | |
RAY_FS_READLINK, | |
RAY_FS_CHOWN, | |
RAY_FS_FCHOWN | |
} ray_type_t; | |
struct ray_evt_s { | |
ray_type_t type; | |
int info; | |
void* data; | |
}; | |
struct ray_evq_s { | |
size_t nput; | |
size_t nget; | |
size_t size; | |
ray_evt_t* evts; | |
}; | |
struct ray_agent_s { | |
ray_handle_t h; | |
ray_buf_t buf; | |
int flags; | |
int count; | |
ngx_queue_t fiber_queue; | |
ngx_queue_t queue; | |
void* data; | |
int ref; | |
}; | |
struct ray_fiber_s { | |
ray_req_t r; | |
lua_State* L; | |
int L_ref; | |
int flags; | |
uv_idle_t hook; | |
ngx_queue_t fiber_queue; | |
ngx_queue_t queue; | |
int ref; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment