Skip to content

Instantly share code, notes, and snippets.

@afiore
Last active August 17, 2018 00:34
Show Gist options
  • Save afiore/7336090 to your computer and use it in GitHub Desktop.
Save afiore/7336090 to your computer and use it in GitHub Desktop.
Basic implementation of a thread pool like construct in Haskell
import qualified Data.ByteString.Lazy as BSL
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import Control.Concurrent
import Control.Concurrent.Chan
import qualified Network.HTTP.Conduit as C
import Network.HTTP.Types (Status(..))
second = 1000000
millisecond = 1000
type URL = String
type MBox = MVar Status
data Task
= Task
{ url :: URL
, mbox :: MBox
}
worker :: C.Manager -> Chan Task -> ResourceT IO ()
worker manager taskQueue = loop
where
loop = do
(Task url mbox) <- liftIO $ readChan taskQueue
req <- C.parseUrl url
resp <- C.httpLbs req manager
let code = C.responseStatus resp
liftIO . print $ url ++ ":" ++ show code
liftIO $ putMVar mbox code
loop
agent :: Chan Task -> [URL] -> IO ()
agent tasks urls = do
print "Starting agent ..."
tasks <- mapM sendTask urls
print "all sent."
waitWithTimeout millisecond second tasks
where
sendTask :: URL -> IO Task
sendTask url = do
task <- Task url `liftM` newEmptyMVar
writeChan tasks task
return task
waitWithTimeout :: Int -> Int -> [Task] -> IO ()
waitWithTimeout sleep taskTimeout tasks = do
-- totalTimeout should always be divisible by sleep
let totalTimeout = taskTimeout * (length tasks)
iterations = totalTimeout `div` sleep
waitForCompletion iterations
where
waitForCompletion :: Int -> IO ()
waitForCompletion 0 = return ()
waitForCompletion i = do
done <- allDone tasks
if done
then putStrLn "All done!"
else
threadDelay sleep >> waitForCompletion (i - 1)
allDone :: [Task] -> IO Bool
allDone tasks = (all (== True)) `liftM` mapM isComplete tasks
where
isComplete :: Task -> IO Bool
isComplete (Task _ mb) = not `liftM` isEmptyMVar mb
urls :: [URL]
urls = [ "http://example.com"
, "http://hackage.haskell.org"
, "http://google.com"
, "http://facebook.com"
, "http://urli.st/47A/tbi"
]
main :: IO ()
main = do
manager <- C.newManager $ C.def
tasks <- newChan
replicateM_ 3 $ forkIO $ runResourceT (worker manager tasks)
agent tasks urls
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment