Skip to content

Instantly share code, notes, and snippets.

@LSLeary
Last active September 13, 2025 22:21
Show Gist options
  • Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Transparently hand off jobs to work queues serviced by a dynamic number of workers capable of sending progress reports
{-# LANGUAGE GADTs, RankNTypes, LambdaCase, BlockArguments #-}
module WorkQueue (
Progress(..),
WorkQueue,
newWorkQueue,
newWorkQueueIO,
spawnWorker,
schedule,
terminate,
workers,
) where
-- base
import Data.Function (fix)
import Data.Functor ((<&>), ($>))
import Data.Foldable (traverse_)
import Data.List (delete)
import Control.Monad.Fix (mfix)
import Control.Exception (try, SomeException, uninterruptibleMask)
import Control.Concurrent (forkIO, forkOS, ThreadId)
-- stm
import Control.Concurrent.STM (STM, atomically, throwSTM)
import Control.Concurrent.STM.TVar
(TVar, newTVar, newTVarIO, modifyTVar, readTVar, writeTVar)
import Control.Concurrent.STM.TChan
(TChan, newTChan, newTChanIO, writeTChan, readTChan, unGetTChan)
data Progress a b
= None
| Ongoing a
| Finished b
data WorkQueue = WorkQueue
{ bound :: {-# UNPACK #-} !Bool
, workers_ :: {-# UNPACK #-} !(TVar [ThreadId])
, jobs :: {-# UNPACK #-} !(TChan (Maybe Job))
}
newWorkQueue :: Bool -> STM WorkQueue
newWorkQueue bound = WorkQueue bound <$> newTVar [] <*> newTChan
newWorkQueueIO :: Bool -> IO WorkQueue
newWorkQueueIO bound = WorkQueue bound <$> newTVarIO [] <*> newTChanIO
spawnWorker :: WorkQueue -> IO ThreadId
spawnWorker WorkQueue{bound,workers_,jobs} = mfix \tId -> do
atomically (modifyTVar workers_ (tId:))
uninterruptibleMask \restore -> fork do
fix \loop -> restore nextJob >>= traverse_ \j ->
runJob restore j >> loop
atomically (modifyTVar workers_ (delete tId))
where
fork = if bound then forkOS else forkIO
nextJob = atomically $ readTChan jobs >>= \case
Nothing -> unGetTChan jobs Nothing $> Nothing
jj -> pure jj
schedule
:: WorkQueue
-> ((a -> STM ()) -> IO b)
-> STM (STM (Progress a b))
schedule WorkQueue{jobs} io = do
(progr, job) <- newJob io
writeTChan jobs (Just job) $> progr
terminate :: WorkQueue -> STM ()
terminate WorkQueue{jobs} = writeTChan jobs Nothing
workers :: WorkQueue -> STM [ThreadId]
workers WorkQueue{workers_} = readTVar workers_
data Job = forall a b. Job
{ work :: IO b
, progress :: {-# UNPACK #-} !(TVar (Either SomeException (Progress a b)))
}
newJob
:: ((a -> STM ()) -> IO b)
-> STM (STM (Progress a b), Job)
newJob mkWork = newTVar (Right None) <&> \progress ->
( readTVar progress >>= either throwSTM pure
, Job{ work = mkWork (writeTVar progress . Right . Ongoing)
, progress
}
)
runJob :: (forall x. IO x -> IO x) -> Job -> IO ()
runJob restore Job{work,progress} = do
esa <- try (restore work)
atomically do
writeTVar progress (Finished <$> esa)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment