Skip to content

Instantly share code, notes, and snippets.

@LSLeary
Created August 19, 2024 13:17
Show Gist options
  • Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Save LSLeary/e139dfe025e2ed5a47040106b321dbc3 to your computer and use it in GitHub Desktop.
Transparently hand off jobs to a pool of workers.
{-# LANGUAGE DerivingVia, LambdaCase, BlockArguments #-}
module Pool (
Pool,
runPool,
schedule,
withRunInIO,
) where
-- base
import Data.Functor ((<&>), ($>))
import Data.Foldable (traverse_)
import Control.Applicative (Alternative)
import Control.Monad (forever, replicateM)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Fix (MonadFix)
import Control.Exception (try, SomeException)
import Control.Concurrent (ThreadId, forkIO, killThread)
-- stm
import Control.Concurrent.STM (STM, atomically, throwSTM)
import Control.Concurrent.STM.TMVar
(TMVar, newEmptyTMVarIO, writeTMVar, readTMVar)
import Control.Concurrent.STM.TChan (TChan, newTChanIO, readTChan, writeTChan)
-- transformers
import Control.Monad.Trans.Reader (ReaderT(..))
newtype Pool a = Pool{ unPool :: TChan Job -> IO a }
deriving (Functor, Applicative, Alternative, Monad, MonadIO, MonadFix)
via ReaderT (TChan Job) IO
runPool :: Int -> Pool a -> IO a
runPool size pool = do
jobs <- newTChanIO
(threads, a) <- liftA2 (,) (spinOff size) pool `unPool` jobs
traverse_ killThread threads $> a
where
spinOff :: Int -> Pool [ThreadId]
spinOff n = withRunInIO \run -> (replicateM n . forkIO . forever) do
run (next >>= runJob)
where
next :: Pool Job
next = Pool (atomically . readTChan)
schedule :: Pool a -> Pool (STM a)
schedule pa = Pool \jobs -> do
(awaitr, job) <- newJob pa
atomically (writeTChan jobs job) $> awaitr
withRunInIO :: ((forall a. Pool a -> IO a) -> IO b) -> Pool b
withRunInIO k = Pool \jobs ->
k (`unPool` jobs)
data Job where
Job :: { work :: Pool a
, result :: {-# UNPACK #-} !(TMVar (Either SomeException a))
} -> Job
newJob :: Pool a -> IO (STM a, Job)
newJob work = newEmptyTMVarIO <&> \result ->
(await result, Job{work,result})
where
await result = readTMVar result >>= either throwSTM pure
runJob :: Job -> Pool ()
runJob Job{work,result} = withRunInIO \run -> do
esa <- try (run work)
atomically (writeTMVar result esa)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment