Skip to content

Instantly share code, notes, and snippets.

@TerrorJack
Last active June 13, 2016 19:09
Show Gist options
  • Save TerrorJack/f8edcb578d32fd21431e89eeb7580833 to your computer and use it in GitHub Desktop.
Save TerrorJack/f8edcb578d32fd21431e89eeb7580833 to your computer and use it in GitHub Desktop.
A simple thread pool. You can dispatch IO actions to the pool for execution.
import Control.Concurrent.Async
import Control.Concurrent.STM.TBChan
import Control.Exception
import Control.Monad
import Data.Foldable
import Data.Vector as V
import GHC.Conc
data PoolConfig = PoolConfig {
threadCount :: Int,
channelCapacity :: Int
}
data Pool = Pool {
asyncs :: Vector (Async ()),
channel :: TBChan (IO ())
}
-- Ignore any kind of exception of an IO action
ignoreException :: IO a -> IO (Either SomeException a)
ignoreException = try
-- Start a thread pool
newPool :: PoolConfig -> IO Pool
newPool conf = do
chan <- newTBChanIO $ channelCapacity conf
v <- V.replicateM (threadCount conf) $ async . forever $ do
m <- atomically $ readTBChan chan
ignoreException m
return $ Pool v chan
-- Dispatch an IO action to the thread pool for execution. Blocks when channel capacity is exceeded
usePool :: Pool -> IO () -> IO ()
usePool pool m = atomically $ writeTBChan (channel pool) m
-- Shut down the thread pool
killPool :: Pool -> IO ()
killPool pool = traverse_ cancel $ asyncs pool
@TerrorJack
Copy link
Author

TerrorJack commented Jun 9, 2016

Dependencies:

  • base
  • async
  • stm-chans
  • vector

Exception handling is naive: any exception occurred during execution of the target IO action is discarded. A more appropriate way is to support passing exception handlers in usePool, and log the unhandled exceptions.

Another drawback of runPool: doesn't support sending an IO a and retrieving result a.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment