Skip to content

Instantly share code, notes, and snippets.

@intolerable
Last active August 29, 2015 14:08
Show Gist options
  • Save intolerable/283141790839bbf79b17 to your computer and use it in GitHub Desktop.
Save intolerable/283141790839bbf79b17 to your computer and use it in GitHub Desktop.
a quick port of john graham-cumming's dotgo talk code in haskell
module DotGo where
import Control.Concurrent.Async
import Control.Monad
import Pipes
import Pipes.Concurrent
import qualified Pipes.Prelude as Pipes
main :: IO ()
main = do
-- create a channel for the tasks. the workers will pull their work from here
(tasksOut, tasksIn) <- spawn Unbounded
-- create a channel for the results. the workers will put their work here when it is done
(resultsOut, resultsIn) <- spawn Unbounded
-- spawn an async that will pipe everything from STDIN into the input channel
stdinTask <- async $ do
runEffect $ Pipes.stdinLn >-> toOutput tasksOut
-- spawn another async that will pipe everything from the results channel to STDOUT
stdoutTask <- async $ do
runEffect $ fromInput resultsIn >-> Pipes.stdoutLn
-- spawn a lot of async workers which will all pull from the tasks channel,
-- process it, and then spit it into the results channel
workerTasks <- replicateM 1000 $ async $
runEffect $ fromInput tasksIn >-> doWork >-> toOutput resultsOut
-- wait for all of our async tasks to complete
mapM_ wait $ stdinTask : stdoutTask : workerTasks
doWork :: Pipe String String IO ()
doWork = forever $ do
x <- await
-- do some work in here
yield x
groupWork :: Int -> Producer a IO () -> Pipe a b IO () -> Consumer b IO () -> IO ()
groupWork workers inputPipe processPipe outputPipe = do
(tasksOut, tasksIn) <- spawn Unbounded
(resultsOut, resultsIn) <- spawn Unbounded
stdinTask <- async $
runEffect $ inputPipe >-> toOutput tasksOut
stdoutTask <- async $
runEffect $ fromInput resultsIn >-> outputPipe
workerTasks <- replicateM workers $ async $
runEffect $ fromInput tasksIn >-> processPipe >-> toOutput resultsOut
mapM_ wait $ stdinTask : stdoutTask : workerTasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment