Skip to content

Instantly share code, notes, and snippets.

@LSLeary
Last active June 24, 2025 06:31
Show Gist options
  • Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Transparently hand off jobs to work queues serviced by a dynamic number of workers
{-# LANGUAGE DerivingVia, LambdaCase, BlockArguments #-}
module WorkQueue (
WorkQueue,
newWorkQueue,
spawnWorker,
schedule,
) where
-- base
import Data.Functor ((<&>), ($>))
import Control.Monad (forever)
import Control.Exception (try, SomeException)
import Control.Concurrent (forkIO, ThreadId)
-- stm
import Control.Concurrent.STM (STM, atomically, throwSTM)
import Control.Concurrent.STM.TMVar
(TMVar, newEmptyTMVar, writeTMVar, readTMVar)
import Control.Concurrent.STM.TChan (TChan, newTChanIO, readTChan, writeTChan)
newtype WorkQueue = WorkQueue{ jobs :: TChan Job }
newWorkQueue :: IO WorkQueue
newWorkQueue = WorkQueue <$> newTChanIO
spawnWorker :: WorkQueue -> IO ThreadId
spawnWorker WorkQueue{jobs} = (forkIO . forever) do
atomically (readTChan jobs) >>= runJob
schedule :: WorkQueue -> IO a -> STM (STM a)
schedule WorkQueue{jobs} io = do
(awaitr, job) <- newJob io
writeTChan jobs job $> awaitr
data Job where
Job :: { work :: IO a
, result :: {-# UNPACK #-} !(TMVar (Either SomeException a))
} -> Job
newJob :: IO a -> STM (STM a, Job)
newJob work = newEmptyTMVar <&> \result ->
(await result, Job{work,result})
where
await result = readTMVar result >>= either throwSTM pure
runJob :: Job -> IO ()
runJob Job{work,result} = do
esa <- try work
atomically (writeTMVar result esa)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment