Last active
August 17, 2018 00:34
-
-
Save afiore/7336090 to your computer and use it in GitHub Desktop.
Basic implementation of a thread pool like construct in Haskell
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import qualified Data.ByteString.Lazy as BSL | |
import Control.Monad | |
import Control.Monad.Trans | |
import Control.Monad.Trans.Resource | |
import Control.Concurrent | |
import Control.Concurrent.Chan | |
import qualified Network.HTTP.Conduit as C | |
import Network.HTTP.Types (Status(..)) | |
second = 1000000 | |
millisecond = 1000 | |
type URL = String | |
type MBox = MVar Status | |
data Task | |
= Task | |
{ url :: URL | |
, mbox :: MBox | |
} | |
worker :: C.Manager -> Chan Task -> ResourceT IO () | |
worker manager taskQueue = loop | |
where | |
loop = do | |
(Task url mbox) <- liftIO $ readChan taskQueue | |
req <- C.parseUrl url | |
resp <- C.httpLbs req manager | |
let code = C.responseStatus resp | |
liftIO . print $ url ++ ":" ++ show code | |
liftIO $ putMVar mbox code | |
loop | |
agent :: Chan Task -> [URL] -> IO () | |
agent tasks urls = do | |
print "Starting agent ..." | |
tasks <- mapM sendTask urls | |
print "all sent." | |
waitWithTimeout millisecond second tasks | |
where | |
sendTask :: URL -> IO Task | |
sendTask url = do | |
task <- Task url `liftM` newEmptyMVar | |
writeChan tasks task | |
return task | |
waitWithTimeout :: Int -> Int -> [Task] -> IO () | |
waitWithTimeout sleep taskTimeout tasks = do | |
-- totalTimeout should always be divisible by sleep | |
let totalTimeout = taskTimeout * (length tasks) | |
iterations = totalTimeout `div` sleep | |
waitForCompletion iterations | |
where | |
waitForCompletion :: Int -> IO () | |
waitForCompletion 0 = return () | |
waitForCompletion i = do | |
done <- allDone tasks | |
if done | |
then putStrLn "All done!" | |
else | |
threadDelay sleep >> waitForCompletion (i - 1) | |
allDone :: [Task] -> IO Bool | |
allDone tasks = (all (== True)) `liftM` mapM isComplete tasks | |
where | |
isComplete :: Task -> IO Bool | |
isComplete (Task _ mb) = not `liftM` isEmptyMVar mb | |
urls :: [URL] | |
urls = [ "http://example.com" | |
, "http://hackage.haskell.org" | |
, "http://google.com" | |
, "http://facebook.com" | |
, "http://urli.st/47A/tbi" | |
] | |
main :: IO () | |
main = do | |
manager <- C.newManager $ C.def | |
tasks <- newChan | |
replicateM_ 3 $ forkIO $ runResourceT (worker manager tasks) | |
agent tasks urls |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment