Skip to content

Instantly share code, notes, and snippets.

@st235
Created January 5, 2025 14:04
Show Gist options
  • Save st235/ae8c2aaa48ad107ab9e0956ffc2c146f to your computer and use it in GitHub Desktop.
Save st235/ae8c2aaa48ad107ab9e0956ffc2c146f to your computer and use it in GitHub Desktop.
Programming in Lua. Exercise 31.3
// Exercise 31.3:
// Implement in the lproc library a non-blocking send operation.
#define LUA_COMPAT_MODULE
#define LUA_COMPAT_APIINTCASTS
#include <cstdint>
#include <iostream>
#include <pthread.h>
#include "lua.hpp"
namespace {
struct Proc {
lua_State *L;
pthread_t thread;
pthread_cond_t cond;
const char *channel;
struct Proc *previous, *next;
};
static Proc *waitsend = NULL;
static Proc *waitreceive = NULL;
static pthread_mutex_t kernel_access = PTHREAD_MUTEX_INITIALIZER;
bool AbortWithMessage(const char* message) {
std::cout << message << std::endl;
std::abort();
}
bool LoadFile(lua_State* state, const std::string& filename) {
return !(luaL_loadfile(state, filename.c_str()) || lua_pcall(state, 0, 0, 0));
}
Proc* getself(lua_State *L) {
Proc* p;
lua_getfield(L, LUA_REGISTRYINDEX, "_SELF");
p = static_cast<Proc*>(lua_touserdata(L, -1));
lua_pop(L, 1);
return p;
}
void movevalues(lua_State* send, lua_State* rec) {
int n = lua_gettop(send);
lua_xmove(send, rec, n - 1);
}
Proc* searchmatch(const char *channel, Proc **list) {
Proc *node = *list;
if (node == NULL) return NULL; /* empty list? */
do {
if (strcmp(channel, node->channel) == 0) { /* match? */
/* remove node from the list */
if (*list == node) /* is this node the first element? */
*list = (node->next == node) ? NULL : node->next;
node->previous->next = node->next;
node->next->previous = node->previous;
return node;
}
node = node->next;
} while (node != *list);
return NULL; /* no match */
}
void waitonlist(lua_State* L, const char* channel, Proc** list) {
Proc *p = getself(L);
/* link itself at the end of the list */
if (*list == NULL) { /* empty list? */
*list = p;
p->previous = p->next = p;
}
else {
p->previous = (*list)->previous;
p->next = *list;
p->previous->next = p->next->previous = p;
}
p->channel = channel;
do { /* waits on its condition variable */
pthread_cond_wait(&p->cond, &kernel_access);
} while (p->channel);
}
int ll_send(lua_State* L) {
Proc *p;
const char *channel = luaL_checkstring(L, 1);
pthread_mutex_lock(&kernel_access);
p = searchmatch(channel, &waitreceive);
if (p) { /* found a matching receiver? */
movevalues(L, p->L); /* move values to receiver */
p->channel = NULL; /* mark receiver as not waiting */
pthread_cond_signal(&p->cond); /* wake it up */
}
pthread_mutex_unlock(&kernel_access);
return 0;
}
static int ll_receive (lua_State *L) {
Proc *p;
const char *channel = luaL_checkstring(L, 1);
lua_settop(L, 1);
pthread_mutex_lock(&kernel_access);
p = searchmatch(channel, &waitsend);
if (p) { /* found a matching sender? */
movevalues(p->L, L); /* get values from sender */
p->channel = NULL; /* mark sender as not waiting */
pthread_cond_signal(&p->cond); /* wake it up */
}
else
waitonlist(L, channel, &waitreceive);
pthread_mutex_unlock(&kernel_access);
/* return all stack values but channel */
return lua_gettop(L) - 1;
}
// Forward declaration for ll_thread.
int luaopen_lproc(lua_State *L);
void* ll_thread(void *arg) {
lua_State* L = (lua_State*) arg;
luaL_openlibs(L);
luaopen_lproc(L);
if (lua_pcall(L, 0, 0, 0) != 0) {
fprintf(stderr, "thread error: %s", lua_tostring(L, -1));
}
pthread_cond_destroy(&getself(L)->cond);
lua_close(L);
return NULL;
}
int ll_start(lua_State *L) {
pthread_t thread;
luaL_checktype(L, 1, LUA_TFUNCTION);
lua_State *L1 = luaL_newstate();
if (L1 == NULL) {
luaL_error(L, "unable to create new state");
}
lua_xmove(L, L1, 1);
if (pthread_create(&thread, NULL, ll_thread, L1) != 0) {
luaL_error(L, "unable to create new thread");
}
pthread_detach(thread);
return 0;
}
static int ll_exit (lua_State *L) {
pthread_exit(NULL);
return 0;
}
static const luaL_Reg ll_funcs[] = {
{"start", ll_start},
{"send", ll_send},
{"receive", ll_receive},
{"exit", ll_exit},
{NULL, NULL}
};
int luaopen_lproc(lua_State *L) {
/* create own control block */
Proc *self = (Proc*) lua_newuserdata(L, sizeof(Proc));
lua_setfield(L, LUA_REGISTRYINDEX, "_SELF");
self->L = L;
self->thread = pthread_self();
self->channel = NULL;
pthread_cond_init(&self->cond, NULL);
luaL_newlib(L, ll_funcs);
lua_setglobal(L, "lproc");
return 0;
}
} // namespace
int main(int argc, char* argv[]) {
if (argc < 2) {
std::cout << "Please, specify lua script file to run." << std::endl;
return 0;
}
lua_State* L = luaL_newstate();
luaL_openlibs(L);
luaopen_lproc(L);
if (!LoadFile(L, std::string(argv[1]))) {
AbortWithMessage(lua_tostring(L, -1));
return -1;
}
return 0;
}
function thread1()
print("Print from thread 1:", lproc.receive("channel1"))
end
function thread2()
print("Sending from thread 2")
lproc.send("channel1", false, "hello world", true)
end
lproc.start(thread1)
lproc.start(thread2)
lproc.exit()
print("Wrapping up commincation channel")
Sending from thread 2
Print from thread 1: false hello world true
### !OR THE OUTPUT BELOW AS SEND NO LONGER WAITS FOR RECEIVER
Sending from thread 2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment