Last active
June 13, 2016 19:09
-
-
Save TerrorJack/f8edcb578d32fd21431e89eeb7580833 to your computer and use it in GitHub Desktop.
A simple thread pool. You can dispatch IO actions to the pool for execution.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Control.Concurrent.Async | |
import Control.Concurrent.STM.TBChan | |
import Control.Exception | |
import Control.Monad | |
import Data.Foldable | |
import Data.Vector as V | |
import GHC.Conc | |
data PoolConfig = PoolConfig { | |
threadCount :: Int, | |
channelCapacity :: Int | |
} | |
data Pool = Pool { | |
asyncs :: Vector (Async ()), | |
channel :: TBChan (IO ()) | |
} | |
-- Ignore any kind of exception of an IO action | |
ignoreException :: IO a -> IO (Either SomeException a) | |
ignoreException = try | |
-- Start a thread pool | |
newPool :: PoolConfig -> IO Pool | |
newPool conf = do | |
chan <- newTBChanIO $ channelCapacity conf | |
v <- V.replicateM (threadCount conf) $ async . forever $ do | |
m <- atomically $ readTBChan chan | |
ignoreException m | |
return $ Pool v chan | |
-- Dispatch an IO action to the thread pool for execution. Blocks when channel capacity is exceeded | |
usePool :: Pool -> IO () -> IO () | |
usePool pool m = atomically $ writeTBChan (channel pool) m | |
-- Shut down the thread pool | |
killPool :: Pool -> IO () | |
killPool pool = traverse_ cancel $ asyncs pool |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Dependencies:
base
async
stm-chans
vector
Exception handling is naive: any exception occurred during execution of the target
IO
action is discarded. A more appropriate way is to support passing exception handlers inusePool
, and log the unhandled exceptions.Another drawback of
runPool
: doesn't support sending anIO a
and retrieving resulta
.