Skip to content

Instantly share code, notes, and snippets.

@DarinM223
Last active February 11, 2021 15:13
Show Gist options
  • Save DarinM223/378844b34c35e883de8998782537b58b to your computer and use it in GitHub Desktop.
Save DarinM223/378844b34c35e883de8998782537b58b to your computer and use it in GitHub Desktop.
Combining producers in streaming libraries
module ConduitTest where
import Data.Conduit.Internal (sourceToPipe, unconsM)
import Data.Foldable (for_)
import Control.Monad (forever, void)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Conduit
lotsOfInts :: Monad m => ConduitT () Int m ()
lotsOfInts = for_ [0..5] $ \i -> yield i
lotsOfStrings :: Monad m => ConduitT () String m ()
lotsOfStrings = for_ [0..100] $ \i -> yield $ show i
combineProducers
:: Monad m
=> ConduitT () o m ()
-> ConduitT () o' m ()
-> ConduitT () (o, o') m ()
combineProducers c1 c2 = go (sourceToPipe c1) (sourceToPipe c2)
where
go p1 p2 = do
result1 <- lift $ unconsM p1
result2 <- lift $ unconsM p2
case (result1, result2) of
(Nothing, _) -> return ()
(_, Nothing) -> return ()
(Just (v1, p1'), Just (v2, p2')) -> yield (v1, v2) >> go p1' p2'
combineManySameProducers
:: Monad m => [ConduitT () o m ()] -> ConduitT () o m ()
combineManySameProducers = go . fmap sourceToPipe
where
go (p:ps) = do
result <- lift $ unconsM p
case result of
Nothing -> return ()
Just (v, p') -> yield v >> go (ps ++ [p'])
dropEveryOther :: Monad m => ConduitT a a m ()
dropEveryOther = void $ runMaybeT $ forever $ do
val <- MaybeT await *> MaybeT await
lift $ yield val
consume :: MonadIO m => ConduitT (Int, String) Void m ()
consume = void $ runMaybeT $ forever $ do
(i, s) <- MaybeT await
liftIO $ putStrLn $ "i: " ++ show i ++ " s: " ++ show s
consume' :: MonadIO m => ConduitT Int Void m ()
consume' = void $ runMaybeT $ forever $ do
i <- MaybeT await
liftIO $ putStrLn $ "i: " ++ show i
main :: IO ()
main = do
runConduit $ combineProducers lotsOfInts lotsOfStrings
.| dropEveryOther
.| consume
runConduit $ combineManySameProducers (replicate 5 lotsOfInts) .| consume'
module PipesTest where
import Control.Monad (forever)
import Data.Foldable (for_)
import Pipes
lotsOfInts :: Monad m => Producer Int m ()
lotsOfInts = for_ [0..5] $ \i -> yield i
lotsOfStrings :: Monad m => Producer String m ()
lotsOfStrings = for_ [0..100] $ \i -> yield $ show i
combineProducers
:: Monad m => Producer a m r -> Producer b m r -> Producer (a, b) m r
combineProducers = go
where
go p1 p2 = do
result1 <- lift $ next p1
result2 <- lift $ next p2
case (result1, result2) of
(Left r, _) -> return r
(_, Left r) -> return r
(Right (v1, p1'), Right (v2, p2')) -> yield (v1, v2) >> go p1' p2'
combineManySameProducers :: Monad m => [Producer a m r] -> Producer a m r
combineManySameProducers = go
where
go (p:ps) = do
result <- lift $ next p
case result of
Left r -> return r
Right (v, p') -> yield v >> go (ps ++ [p'])
dropEveryOther :: Monad m => Pipe a a m r
dropEveryOther = forever $ do
val <- await *> await
yield val
consume :: MonadIO m => Consumer (Int, String) m ()
consume = forever $ do
(i, s) <- await
liftIO $ putStrLn $ "i: " ++ show i ++ " s: " ++ show s
consume' :: MonadIO m => Consumer Int m ()
consume' = forever $ do
i <- await
liftIO $ putStrLn $ "i: " ++ show i
main :: IO ()
main = do
runEffect $ combineProducers lotsOfInts lotsOfStrings
>-> dropEveryOther
>-> consume
runEffect $ combineManySameProducers (replicate 5 lotsOfInts) >-> consume'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment