Last active
August 29, 2015 14:08
-
-
Save intolerable/283141790839bbf79b17 to your computer and use it in GitHub Desktop.
a quick port of john graham-cumming's dotgo talk code in haskell
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module DotGo where | |
| import Control.Concurrent.Async | |
| import Control.Monad | |
| import Pipes | |
| import Pipes.Concurrent | |
| import qualified Pipes.Prelude as Pipes | |
| main :: IO () | |
| main = do | |
| -- create a channel for the tasks. the workers will pull their work from here | |
| (tasksOut, tasksIn) <- spawn Unbounded | |
| -- create a channel for the results. the workers will put their work here when it is done | |
| (resultsOut, resultsIn) <- spawn Unbounded | |
| -- spawn an async that will pipe everything from STDIN into the input channel | |
| stdinTask <- async $ do | |
| runEffect $ Pipes.stdinLn >-> toOutput tasksOut | |
| -- spawn another async that will pipe everything from the results channel to STDOUT | |
| stdoutTask <- async $ do | |
| runEffect $ fromInput resultsIn >-> Pipes.stdoutLn | |
| -- spawn a lot of async workers which will all pull from the tasks channel, | |
| -- process it, and then spit it into the results channel | |
| workerTasks <- replicateM 1000 $ async $ | |
| runEffect $ fromInput tasksIn >-> doWork >-> toOutput resultsOut | |
| -- wait for all of our async tasks to complete | |
| mapM_ wait $ stdinTask : stdoutTask : workerTasks | |
| doWork :: Pipe String String IO () | |
| doWork = forever $ do | |
| x <- await | |
| -- do some work in here | |
| yield x | |
| groupWork :: Int -> Producer a IO () -> Pipe a b IO () -> Consumer b IO () -> IO () | |
| groupWork workers inputPipe processPipe outputPipe = do | |
| (tasksOut, tasksIn) <- spawn Unbounded | |
| (resultsOut, resultsIn) <- spawn Unbounded | |
| stdinTask <- async $ | |
| runEffect $ inputPipe >-> toOutput tasksOut | |
| stdoutTask <- async $ | |
| runEffect $ fromInput resultsIn >-> outputPipe | |
| workerTasks <- replicateM workers $ async $ | |
| runEffect $ fromInput tasksIn >-> processPipe >-> toOutput resultsOut | |
| mapM_ wait $ stdinTask : stdoutTask : workerTasks |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment