Last active
February 11, 2021 15:13
-
-
Save DarinM223/378844b34c35e883de8998782537b58b to your computer and use it in GitHub Desktop.
Combining producers in streaming libraries
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module ConduitTest where | |
import Data.Conduit.Internal (sourceToPipe, unconsM) | |
import Data.Foldable (for_) | |
import Control.Monad (forever, void) | |
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT)) | |
import Conduit | |
lotsOfInts :: Monad m => ConduitT () Int m () | |
lotsOfInts = for_ [0..5] $ \i -> yield i | |
lotsOfStrings :: Monad m => ConduitT () String m () | |
lotsOfStrings = for_ [0..100] $ \i -> yield $ show i | |
combineProducers | |
:: Monad m | |
=> ConduitT () o m () | |
-> ConduitT () o' m () | |
-> ConduitT () (o, o') m () | |
combineProducers c1 c2 = go (sourceToPipe c1) (sourceToPipe c2) | |
where | |
go p1 p2 = do | |
result1 <- lift $ unconsM p1 | |
result2 <- lift $ unconsM p2 | |
case (result1, result2) of | |
(Nothing, _) -> return () | |
(_, Nothing) -> return () | |
(Just (v1, p1'), Just (v2, p2')) -> yield (v1, v2) >> go p1' p2' | |
combineManySameProducers | |
:: Monad m => [ConduitT () o m ()] -> ConduitT () o m () | |
combineManySameProducers = go . fmap sourceToPipe | |
where | |
go (p:ps) = do | |
result <- lift $ unconsM p | |
case result of | |
Nothing -> return () | |
Just (v, p') -> yield v >> go (ps ++ [p']) | |
dropEveryOther :: Monad m => ConduitT a a m () | |
dropEveryOther = void $ runMaybeT $ forever $ do | |
val <- MaybeT await *> MaybeT await | |
lift $ yield val | |
consume :: MonadIO m => ConduitT (Int, String) Void m () | |
consume = void $ runMaybeT $ forever $ do | |
(i, s) <- MaybeT await | |
liftIO $ putStrLn $ "i: " ++ show i ++ " s: " ++ show s | |
consume' :: MonadIO m => ConduitT Int Void m () | |
consume' = void $ runMaybeT $ forever $ do | |
i <- MaybeT await | |
liftIO $ putStrLn $ "i: " ++ show i | |
main :: IO () | |
main = do | |
runConduit $ combineProducers lotsOfInts lotsOfStrings | |
.| dropEveryOther | |
.| consume | |
runConduit $ combineManySameProducers (replicate 5 lotsOfInts) .| consume' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module PipesTest where | |
import Control.Monad (forever) | |
import Data.Foldable (for_) | |
import Pipes | |
lotsOfInts :: Monad m => Producer Int m () | |
lotsOfInts = for_ [0..5] $ \i -> yield i | |
lotsOfStrings :: Monad m => Producer String m () | |
lotsOfStrings = for_ [0..100] $ \i -> yield $ show i | |
combineProducers | |
:: Monad m => Producer a m r -> Producer b m r -> Producer (a, b) m r | |
combineProducers = go | |
where | |
go p1 p2 = do | |
result1 <- lift $ next p1 | |
result2 <- lift $ next p2 | |
case (result1, result2) of | |
(Left r, _) -> return r | |
(_, Left r) -> return r | |
(Right (v1, p1'), Right (v2, p2')) -> yield (v1, v2) >> go p1' p2' | |
combineManySameProducers :: Monad m => [Producer a m r] -> Producer a m r | |
combineManySameProducers = go | |
where | |
go (p:ps) = do | |
result <- lift $ next p | |
case result of | |
Left r -> return r | |
Right (v, p') -> yield v >> go (ps ++ [p']) | |
dropEveryOther :: Monad m => Pipe a a m r | |
dropEveryOther = forever $ do | |
val <- await *> await | |
yield val | |
consume :: MonadIO m => Consumer (Int, String) m () | |
consume = forever $ do | |
(i, s) <- await | |
liftIO $ putStrLn $ "i: " ++ show i ++ " s: " ++ show s | |
consume' :: MonadIO m => Consumer Int m () | |
consume' = forever $ do | |
i <- await | |
liftIO $ putStrLn $ "i: " ++ show i | |
main :: IO () | |
main = do | |
runEffect $ combineProducers lotsOfInts lotsOfStrings | |
>-> dropEveryOther | |
>-> consume | |
runEffect $ combineManySameProducers (replicate 5 lotsOfInts) >-> consume' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment