Last active
June 24, 2025 06:31
-
-
Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Transparently hand off jobs to work queues serviced by a dynamic number of workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DerivingVia, LambdaCase, BlockArguments #-} | |
module WorkQueue ( | |
WorkQueue, | |
newWorkQueue, | |
spawnWorker, | |
schedule, | |
) where | |
-- base | |
import Data.Functor ((<&>), ($>)) | |
import Control.Monad (forever) | |
import Control.Exception (try, SomeException) | |
import Control.Concurrent (forkIO, ThreadId) | |
-- stm | |
import Control.Concurrent.STM (STM, atomically, throwSTM) | |
import Control.Concurrent.STM.TMVar | |
(TMVar, newEmptyTMVar, writeTMVar, readTMVar) | |
import Control.Concurrent.STM.TChan (TChan, newTChanIO, readTChan, writeTChan) | |
newtype WorkQueue = WorkQueue{ jobs :: TChan Job } | |
newWorkQueue :: IO WorkQueue | |
newWorkQueue = WorkQueue <$> newTChanIO | |
spawnWorker :: WorkQueue -> IO ThreadId | |
spawnWorker WorkQueue{jobs} = (forkIO . forever) do | |
atomically (readTChan jobs) >>= runJob | |
schedule :: WorkQueue -> IO a -> STM (STM a) | |
schedule WorkQueue{jobs} io = do | |
(awaitr, job) <- newJob io | |
writeTChan jobs job $> awaitr | |
data Job where | |
Job :: { work :: IO a | |
, result :: {-# UNPACK #-} !(TMVar (Either SomeException a)) | |
} -> Job | |
newJob :: IO a -> STM (STM a, Job) | |
newJob work = newEmptyTMVar <&> \result -> | |
(await result, Job{work,result}) | |
where | |
await result = readTMVar result >>= either throwSTM pure | |
runJob :: Job -> IO () | |
runJob Job{work,result} = do | |
esa <- try work | |
atomically (writeTMVar result esa) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment