Created
November 6, 2012 17:39
-
-
Save Deco/4026258 to your computer and use it in GitHub Desktop.
Multi-threaded LuaSocket with Lua Lanes example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--[[ socketlanes.lua | |
Multi-threaded LuaSocket with Lua Lanes example | |
=============================================== | |
Depends on the following LuaSocket 2.0.2 patch: | |
http://www.net-core.org/dl/luasocket-2.0.2-acceptfd.patch | |
(via http://www.net-core.org/39/lua/patching-luasocket-to-make-it-compatible) | |
(provided at end of file) | |
Tested using Lua Lanes 3.4.0 (github, 2012-25-10) | |
Copyright (C) 2012 Declan White | |
Permission is hereby granted, free of charge, to any person obtaining a | |
copy of this software and associated documentation files (the "Software"), | |
to deal in the Software without restriction, including without limitation | |
the rights to use, copy, modify, merge, publish, distribute, sublicense, | |
and/or sell copies of the Software, and to permit persons to whom the | |
Software is furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL | |
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER | |
DEALINGS IN THE SOFTWARE. | |
]] | |
--[[ | |
debug.sethook( | |
function(ev, evdata) | |
local info = debug.getinfo(2, 'Sn') | |
if ev == 'line' then | |
print(tostring(info.source)..":"..(tonumber(evdata) or "?")) | |
end | |
end, | |
'l' | |
) | |
]] | |
package.path = "./lua/modules/?.lua" | |
package.cpath = "./lua/modules/?.dll" | |
function main(...) | |
local lanes = require'lanes' | |
--os.execute("pause") | |
if lanes.configure then lanes.configure() end -- depends on version | |
local io_file_write, io_file_flush = io.stdout.write, io.stdout.flush | |
local string_format = string.format | |
local printf_sync_linda = lanes.linda() | |
local printf_sync_lockfunc = lanes.genlock(printf_sync_linda, 'printf_sync_lock', 1) | |
local function printf_sync(fmt, ...) | |
local arglist = {...} | |
for arg_i = 1, select('#', ...) do | |
local arg = arglist[arg_i] | |
if type(arg) ~= 'string' and type(arg) ~= 'number' then | |
arglist[arg_i] = tostring(arg) | |
end | |
end | |
printf_sync_lockfunc(1) | |
io.stdout:flush() | |
--print(fmt, unpack(arglist)) | |
io.stdout:write(string_format(fmt, unpack(arglist))) | |
io.stdout:write("\n") | |
io.stdout:flush() | |
io.stdout:flush() | |
printf_sync_lockfunc(-1) | |
end | |
local myservice = { | |
clientlist = {}, | |
listenlinda = lanes.linda(), | |
} | |
function myservice.clienthandler(client_sock_fd) | |
-- Try to copy as little as possible to this lane. | |
-- That means don't access anything "outside" of this, except arguments. | |
local socket = require'socket' | |
-- Create a new socket object from the file descriptor | |
-- (fd = file descriptor; should be cross platform) | |
local client_sock = socket.tcp(client_sock_fd) | |
local client_peername = tostring(client_sock:getpeername()) | |
printf_sync("ClientHandler %q(%s): Initialised!", client_peername, client_sock_fd) | |
local receivebuffer = {} | |
local receivechar, receiveerr, receiveatend = nil, nil, false | |
repeat | |
receivechar, receiveerr = client_sock:receive(1) | |
if receivechar ~= nil then | |
if receivechar == '\0' then | |
receiveatend = true | |
else | |
table.insert(receivebuffer, receivechar) | |
end | |
else | |
printf_sync("ClientHandler %q(%s): Receive failure (%q)", client_peername, client_sock_fd, receiveerr) | |
break | |
end | |
until receiveatend | |
printf_sync("ClientHandler %q(%s): Buffer = %q", client_peername, client_sock_fd, table.concat(receivebuffer)) | |
client_sock:close() | |
printf_sync("ClientHandler %q(%s): Closed", client_peername, client_sock_fd) | |
end | |
myservice.clienthandler_lanegen = lanes.gen(jit and '*' or 'package,io,math,string,table,debug', myservice.clienthandler) | |
-- 'package' has `require` | |
myservice.listenhandler = function(listenlinda) | |
local socket = require'socket' | |
printf_sync("ListenHandler: Binding", newclient_peername) | |
local listensock = assert(socket.bind('*', 1234)) | |
set_finalizer(function(err) | |
listensock:close() | |
end) | |
listensock:settimeout(0) -- non-blocking | |
printf_sync("ListenHandler: Bound", newclient_peername) | |
listenlinda:send(nil, 'listenhandler_ready', true) | |
while true do -- (should probably implement a way of exitting this cleaning) | |
local lindakey, lindaval = listenlinda:receive(nil, 'listenhandler_halt', 'listenhandler_poll') | |
if lindakey == 'listenhandler_poll' then | |
local newclient_sock_fd = listensock:acceptfd() | |
if newclient_sock_fd ~= nil then | |
-- Although it'd be nice, we can't construct a socket object for this new connection here. | |
-- If we did, the __gc metamethod would close it when this lane exists, meaning any | |
-- clienthandler lanes still running wouldn't be able to finish. | |
local newclient_sock --= socket.tcp(newclient_sock_fd) | |
local newclient_peername --= newclient_sock:getpeername() | |
printf_sync("ListenHandler: Client %q(%s) connected", tostring(newclient_peername), newclient_sock_fd) | |
listenlinda:send(nil, 'newclient_sock_fd', newclient_sock_fd) | |
printf_sync("ListenHandler: Client %q(%s) sent to linda", tostring(newclient_peername), newclient_sock_fd) | |
end | |
elseif lindakey == 'listenhandler_halt' then | |
-- You *could* make `listensock` blocking and just force-cancel the lane, | |
-- but that doesn't seem like good practice. | |
-- There may be a method to interrupt `accept`/`acceptfd` using a linda, | |
-- but I'm not aware of any easy way to do it. | |
break | |
end | |
end | |
return true | |
end | |
myservice.listenhandler_lanegen = lanes.gen(jit and '*' or 'package,string,math,io', myservice.listenhandler) | |
-- Testing! | |
print('yay') | |
local socket = require'socket' | |
print('yay') | |
local lanedebuglist = {} | |
-- Start server listening | |
printf_sync("Main: Launching ListenHandler lane") | |
myservice.listenhandler_lane = myservice.listenhandler_lanegen(myservice.listenlinda) | |
print('yay') | |
table.insert(lanedebuglist, myservice.listenhandler_lane) | |
printf_sync("Main: ListenHandler lane launched: %q", tostring(myservice.listenhandler_lane.status)) | |
repeat | |
local lindakey, listenhandler_ready = myservice.listenlinda:receive(0, 'listenhandler_ready') | |
local status = myservice.listenhandler_lane.status | |
until listenhandler_ready | |
-- Start polling timer (check for new connection every half a second) | |
--lanes.timer(myservice.listenlinda, 'listenhandler_poll', 0, 0.5) | |
printf_sync("Main: ListenHandler lane ready") | |
-- Spawn some clients to send data. | |
local testcount = 5 | |
local debug_hasproblem, debug_err = false, nil | |
do | |
for i = 1, testcount do | |
local sendersock, err = socket.connect('127.0.0.1', 1234) | |
if sendersock ~= nil then | |
sendersock:send(("Hello from client #%s!\0"):format(i)) | |
sendersock:close() | |
printf_sync("Client #%s: data sent", i) | |
else | |
debug_hasproblem = true | |
debug_err = err | |
end | |
end | |
end | |
if not debug_hasproblem then | |
while testcount > 0 do | |
myservice.listenlinda:send('listenhandler_poll', socket.gettime()) | |
local lindakey, newclient_sock_fd = myservice.listenlinda:receive(--[[nil]]0, 'newclient_sock_fd') | |
-- (Note: If you don't use Lua Lanes v3.4.0 or later, swap `lindakey` and `newclient_sock_fd` in the above line!) | |
if lindakey ~= nil and newclient_sock_fd ~= nil then | |
print('yep!') | |
printf_sync("Main: New client available") | |
local newclient = { | |
sock_fd = newclient_sock_fd, | |
} | |
newclient.lane = myservice.clienthandler_lanegen(newclient_sock_fd) | |
table.insert(lanedebuglist, newclient.lane) | |
printf_sync("Main: Client handler launched") | |
table.insert(myservice.clientlist, newclient) | |
testcount = testcount-1 | |
else | |
print('nope') | |
end | |
for lane_i = 1, #lanedebuglist do | |
local lane = lanedebuglist[lane_i] | |
if lane.status == 'error' then | |
debug_hasproblem = true | |
end | |
end | |
end | |
printf_sync("Main: Canceling lanes") | |
repeat | |
myservice.listenlinda:send(nil, 'listenhandler_halt', true) | |
myservice.listenhandler_lane:join(5) -- wait a few seconds for it to close properly | |
until myservice.listenhandler_lane:cancel() | |
for client_i = 1, #myservice.clientlist do | |
local client = myservice.clientlist[client_i] | |
client.lane:cancel() -- Wait for clients | |
end | |
printf_sync("Main: Cancelled lanes") | |
end | |
-- testing! | |
if debug_hasproblem then -- error, proprogate it to main | |
printf_sync("problem! %q", tostring(debug_err)) | |
for lane_i = 1, #lanedebuglist do | |
local lane = lanedebuglist[lane_i] | |
if lane.status == 'error' then | |
local lane_res_1 = lane[1] | |
end | |
end | |
end | |
printf_sync("Main: Done!") | |
return 0 | |
end | |
return main(...) | |
--[[ | |
--- luasocket-2.0.2/src/tcp.c 2007-10-15 06:21:05.000000000 +0200 | |
+++ luasocket-2.0.2.new/src/tcp.c 2009-08-24 23:58:47.000000000 +0200 | |
@@ -30,6 +30,7 @@ | |
static int meth_shutdown(lua_State *L); | |
static int meth_receive(lua_State *L); | |
static int meth_accept(lua_State *L); | |
+static int meth_acceptfd(lua_State *L); | |
static int meth_close(lua_State *L); | |
static int meth_setoption(lua_State *L); | |
static int meth_settimeout(lua_State *L); | |
@@ -42,6 +43,7 @@ | |
{"__gc", meth_close}, | |
{"__tostring", auxiliar_tostring}, | |
{"accept", meth_accept}, | |
+ {"acceptfd", meth_acceptfd}, | |
{"bind", meth_bind}, | |
{"close", meth_close}, | |
{"connect", meth_connect}, | |
@@ -186,6 +188,27 @@ | |
} | |
/*-------------------------------------------------------------------------*\ | |
+* Waits for and returns a client object attempting connection to the | |
+* server object | |
+\*-------------------------------------------------------------------------*/ | |
+static int meth_acceptfd(lua_State *L) | |
+{ | |
+ p_tcp server = (p_tcp) auxiliar_checkclass(L, "tcp{server}", 1); | |
+ p_timeout tm = timeout_markstart(&server->tm); | |
+ t_socket sock; | |
+ int err = socket_accept(&server->sock, &sock, NULL, NULL, tm); | |
+ /* if successful, push client socket */ | |
+ if (err == IO_DONE) { | |
+ lua_pushnumber(L, sock); | |
+ return 1; | |
+ } else { | |
+ lua_pushnil(L); | |
+ lua_pushstring(L, socket_strerror(err)); | |
+ return 2; | |
+ } | |
+} | |
+ | |
+/*-------------------------------------------------------------------------*\ | |
* Binds an object to an address | |
\*-------------------------------------------------------------------------*/ | |
static int meth_bind(lua_State *L) | |
@@ -316,12 +339,19 @@ | |
static int global_create(lua_State *L) | |
{ | |
t_socket sock; | |
- const char *err = inet_trycreate(&sock, SOCK_STREAM); | |
+ const char *err = NULL; | |
+ int fd = luaL_optnumber(L, 1, -1); | |
+ if (fd < 1) | |
+ err = inet_trycreate(&sock, SOCK_STREAM); | |
+ else | |
+ sock = fd; | |
/* try to allocate a system socket */ | |
if (!err) { | |
/* allocate tcp object */ | |
p_tcp tcp = (p_tcp) lua_newuserdata(L, sizeof(t_tcp)); | |
- /* set its type as master object */ | |
+ if (fd >= 1) | |
+ auxiliar_setclass(L, "tcp{client}", -1); | |
+ else | |
auxiliar_setclass(L, "tcp{master}", -1); | |
/* initialize remaining structure fields */ | |
socket_setnonblocking(&sock); | |
]] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment