Last active
April 1, 2024 22:36
-
-
Save Clemapfel/657c02770d5cd77e87a8a25f18a7669e to your computer and use it in GitHub Desktop.
ThreadPool in love2D
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- licensed MIT, created by https://github.com/clemapfel | |
if rt == nil then rt = {} end | |
--- @class rt.ThreadPool | |
rt.ThreadPool = {} | |
--- @brief constructor | |
--- @param n_threads Number number of threads, default: 1 | |
--- @return rt.ThreadPool | |
function rt.ThreadPool:new(n_threads) | |
if n_threads == nil then n_threads = 1 end | |
local out = { | |
_threads = {}, | |
_futures = {}, | |
_n_threads = math.abs(n_threads), | |
_main_to_worker = love.thread.newChannel(), | |
_worker_to_main = love.thread.newChannel() | |
} | |
setmetatable(out, { | |
__index = rt.ThreadPool | |
}) | |
return out | |
end | |
setmetatable(rt.ThreadPool, { | |
__call = rt.ThreadPool.new | |
}) | |
--- @class rt.MessageType | |
rt.MessageType = { | |
KILL = "KILL", | |
LOAD_AUDIO = "LOAD_AUDIO", | |
AUDIO_DONE = "AUDIO_DONE", | |
PRINT = "PRINT", | |
PRINT_DONE = "PRINT_DONE" | |
} | |
--- @class rt.ThreadPool.Message | |
--- @brief message to be passed between main and worker threads, these instruct the threads what to do | |
function rt.ThreadPool.Message(type, id, data) | |
return { | |
type = type, | |
id = id, | |
data = data | |
} | |
end | |
--- @class rt.ThreadPool.Future | |
--- @brief when querying the threadpool, it will return a future object. Once the promised value is available, it will be automatically send to the future | |
function rt.ThreadPool.Future(id) | |
return { | |
id = id, | |
result = nil, | |
is_delivered = false, | |
is_ready = function(self) return self.is_delivered end, | |
get_result = function(self) return self.result end | |
} | |
end | |
--- running internal IDs of futures, private | |
rt.ThreadPool.FUTURE_ID = 0 | |
--- worker source code | |
rt.ThreadPool._thread_source = love.filesystem.read("common/thread_pool_worker.lua") | |
if rt.ThreadPool._thread_source == nil then | |
error("In thread_pool: path to thread pool worker code is invalid, change the file location in thread_pool.lua, line 64") | |
end | |
--- @brief allocate all threads and start them, they will run continuously from this point on | |
function rt.ThreadPool:startup() | |
for i = 1, self._n_threads do | |
local to_push = { | |
thread = love.thread.newThread( | |
rt.ThreadPool._thread_source | |
) | |
} | |
self._threads[i] = to_push | |
to_push.thread:start( | |
self._main_to_worker, -- main_to_worker | |
self._worker_to_main, -- worker_to_main | |
rt.MessageType, -- rt.MessageType | |
i -- THREAD_ID | |
) | |
end | |
end | |
--- @brief [internal] distribute a message of given type to all threads | |
function rt.ThreadPool:_send_message(type, data) | |
rt.ThreadPool.FUTURE_ID = rt.ThreadPool.FUTURE_ID + 1 | |
local id = rt.ThreadPool.FUTURE_ID | |
local message = rt.ThreadPool.Message(type, id, data) | |
local future = rt.ThreadPool.Future(id) | |
self._main_to_worker:push(message) | |
self._futures[id] = future | |
return future | |
end | |
--- @brief ask the thread pool to print a message thread-side | |
--- @return rt.ThreadPool.Future<String> | |
function rt.ThreadPool:request_debug_print(message) | |
return self:_send_message(rt.MessageType.PRINT, message) | |
end | |
--- @brief ask the thread pool to load an audio file | |
--- @return rt.ThreadPool.Future<love.SoundData> | |
function rt.ThreadPool:request_load_sound_data(path_to_audio) | |
return self:_send_message(rt.MessageType.LOAD_AUDIO, path_to_audio) | |
end | |
--- @brief flush queue and distribute results among futures | |
function rt.ThreadPool:update(delta) | |
local futures = {} | |
while self._worker_to_main:getCount() > 0 do | |
local message = self._worker_to_main:pop() | |
local future = self._futures[message.id] | |
future.result = message.data | |
future.is_delivered = true | |
table.insert(futures, future) | |
self._futures[message.id] = nil | |
end | |
return futures | |
end | |
--- @brief safely shutdown threadpool, waits for all tasks to finish | |
function rt.ThreadPool:shutdown() | |
for i = 1, #self._threads do | |
self._main_to_worker:push({ | |
type = rt.MessageType.KILL, | |
data = nil, | |
id = -1 | |
}) | |
end | |
for _, t in pairs(self._threads) do | |
t.thread:wait() | |
end | |
end | |
--- @brief immediately shutdown threadpool, no matter what | |
function rt.ThreadPool:force_shutdown() | |
for _, t in pairs(self._threads) do | |
t.thread:kill() | |
t.thread:wait() | |
end | |
end | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "love.image" | |
require "love.audio" | |
require "love.sound" | |
require "love.timer" | |
-- globals handed from main during :start() | |
local args = {...} | |
main_to_worker = args[1] | |
worker_to_main = args[2] | |
rt = {}; rt.MessageType = args[3] | |
THREAD_ID = args[4] | |
-- main loop | |
while true do | |
-- retrieve message and handle it | |
local message = main_to_worker:demand() | |
if message.type == rt.MessageType.KILL then | |
-- Message Type #1: KILL This will safely shutdown the thread | |
break | |
elseif message.type == rt.MessageType.PRINT then | |
-- Message Type #2: PRINT this will print to the console from the thread, useful for debugging | |
print("Thread #" .. tostring(THREAD_ID) .. " prints: " .. tostring(message.data)) | |
worker_to_main:push({ | |
id = message.id, | |
type = rt.MessageType.PRINT_DONE, | |
data = message.data | |
}) | |
elseif message.type == rt.MessageType.LOAD_AUDIO then | |
-- Message Type #3: LOAD_AUDIO receives a path as message.data, and loads it into memory, then sends the memory back to the worker | |
local res = love.sound.newSoundData(message.data) | |
worker_to_main:push({ | |
id = message.id, | |
type = rt.MessageType.AUDIO_DONE, | |
data = res | |
}) | |
elseif message.type == "TODO" then | |
-- MESSAGE Type #4: TODO add your own message type and thread-side behavior here | |
-- todo | |
else | |
error("In rt.ThreadPool.ThreadWorker: unhandled message type `" .. message.type .. "`") | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "thread_pool" | |
love.load = function() | |
love.window.setMode(1920 / 2, 1080 / 2, { | |
vsync = 1, | |
msaa = 8, | |
stencil = true, | |
resizable = true | |
}) | |
love.window.setTitle("ThreadPool Test") | |
-- tasks to run multi-threaded: print 4 message | |
test_messages = { | |
"First Message", | |
"Second Message", | |
"Third Message", | |
"Fourth Message", | |
} | |
-- and load one music file | |
test_audio = { | |
"assets/music/test_music_03.mp3" | |
} | |
-- this is where the results will be stored | |
futures = {} | |
-- create the thread pool | |
thread_pool = rt.ThreadPool(8) | |
thread_pool:startup() | |
-- queue tasks for the thread pool | |
for _, message in pairs(test_messages) do | |
table.insert(futures, thread_pool:request_debug_print(message)) | |
end | |
for _, path in pairs(test_audio) do | |
table.insert(futures, thread_pool:request_load_sound_data(path)) | |
end | |
end | |
love.update = function() | |
local delta = love.timer.getDelta() | |
-- one or more times per frame, the threadpool needs to be updated | |
-- this will set the values of our `futures`, if their value becomes available | |
local ready_futures = thread_pool:update(delta) | |
-- we can use the futures stored in `future`, but `update` also returns all newly updated futures that turn | |
for _, future in pairs(ready_futures) do | |
assert(future:is_ready()) | |
print("Main Received Future #" .. tostring(future.id) .. ":", future:get_result(), "\n") | |
end | |
-- if all futures are done, safely shutdown thread pool, then love | |
local is_done = true | |
for _, future in pairs(futures) do | |
if not future:is_ready() then | |
is_done = false | |
break | |
end | |
end | |
if is_done then | |
println("succesfully received all futures") | |
love.event.quit() | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
As an example of how to extend the functionality, we will be adding a function that lets the thread read a file into a string thread side.
We first add a new message type (thread_pool.lua)
We then add a function to
ThreadPool
to request this behavior easilyThen, in
thread_pool_worker.lua
, we add a new branch to the message handling that handles our newREAD_FILE
request messageThat's it, now calling
thread_pool:request_read_file("path_to_file.txt")
will read the file into a string thread-side, then return a future whoseget_result
will return the read string when it is ready.