Last active
August 29, 2015 14:25
-
-
Save gavinwahl/8e6f90b7143bbac9bced to your computer and use it in GitHub Desktop.
Is this what pipes does?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE EmptyCase #-} | |
import Data.Void | |
import Control.Applicative | |
import Control.Monad.Trans | |
import Control.Monad | |
import Control.Monad.Writer | |
import System.IO | |
import qualified Data.ByteString as B | |
data Pipeline i o m a | |
= Done a | |
| Yield o (m (Pipeline i o m a)) | |
| Await (i -> m (Pipeline i o m a)) | |
| Eff (m (Pipeline i o m a)) | |
type Producer o m a = Pipeline () o m a | |
type Consumer i m a = Pipeline i Void m a | |
instance (Functor m, Monad m) => Functor (Pipeline i o m) where | |
fmap f (Done a) = Done (f a) | |
fmap f (Yield o m) = Yield o $ do | |
next <- m | |
return $ fmap f next | |
fmap f (Await c) = Await $ \i -> fmap f <$> c i | |
fmap f (Eff m) = Eff $ fmap f <$> m | |
instance (Applicative m, Monad m) => Applicative (Pipeline i o m) where | |
pure = Done | |
(<*>) = ap | |
instance (Functor m, Monad m) => Monad (Pipeline i o m) where | |
return = Done | |
(Done m) >>= f = f m | |
(Yield o m) >>= f = Yield o $ (>>= f) <$> m | |
(Await c) >>= f = Await $ \i -> (>>=f) <$> c i | |
(Eff m) >>= f = Eff $ do | |
p <- m | |
return $ p >>= f | |
instance MonadTrans (Pipeline i o) where | |
lift m = Eff $ do | |
x <- m | |
return $ Done x | |
instance (MonadIO m, Functor m) => MonadIO (Pipeline i o m) where | |
liftIO = lift . liftIO | |
yield :: (Monad m, Functor m) => o -> Pipeline i o m () | |
yield x = Yield x (return $ return ()) | |
await :: (Monad m, Functor m) => Pipeline i o m i | |
await = Await (return . return) | |
(=$=) :: (Monad m, Applicative m) => Pipeline i o m () -> Pipeline (Maybe o) p m b -> Pipeline i p m b | |
(Done _) =$= (Done b) = Done b | |
(Done a) =$= (Yield o n) = Yield o $ do | |
p <- n | |
return $ Done a =$= p | |
(Done a) =$= (Eff m) = Eff $ do | |
p <- m | |
return $ Done a =$= p | |
(Done a) =$= (Await f) = Eff $ do | |
p <- f Nothing | |
return $ Done a =$= p | |
(Yield o c) =$= (Await f) = Eff $ do | |
c' <- c | |
p' <- f (Just o) | |
return $ c' =$= p' | |
(Yield _ _) =$= (Done a) = Done a | |
(Yield o c) =$= (Eff m) = Eff $ do | |
p <- m | |
return $ Yield o c =$= p | |
(Await c) =$= f = Await $ \i -> do | |
n <- c i | |
return $ n =$= f | |
a =$= (Yield o n') = Yield o $ do | |
p <- n' | |
return $ a =$= p | |
-- run the consumer as far as possible before requesting new values | |
-- from the producer | |
a =$= (Eff n) = Eff $ (=$=) <$> pure a <*> n | |
(Eff m) =$= n = Eff $ (=$=) <$> m <*> pure n | |
runPipeline :: (Monad m) => Pipeline () Void m a -> m a | |
runPipeline (Done a) = return a | |
runPipeline (Yield o _) = case o of { } | |
runPipeline (Await f) = f () >>= runPipeline | |
runPipeline (Eff m) = m >>= runPipeline | |
-- utilities | |
mapPipeline :: (Functor m, Monad m) => (a -> b) -> Pipeline (Maybe a) b m () | |
mapPipeline f = do | |
x <- await | |
case x of | |
Just v -> yield (f v) >> mapPipeline f | |
Nothing -> return () | |
collect :: (Monad m) => m (Maybe a) -> m [a] | |
collect m = do | |
v <- m | |
case v of | |
(Just x) -> collect m >>= (\rest -> return $ x:rest) | |
Nothing -> return [] | |
produce :: (Functor m, Monad m) => [a] -> Producer a m () | |
produce [] = return () | |
produce (x:xs) = yield x >> produce xs | |
-- Demo: | |
foo :: IO [Integer] | |
foo = runPipeline $ producer =$= mapPipeline (+1) =$= collect await | |
where producer :: (MonadIO m, Functor m) => Producer Integer m () | |
producer = do | |
yield 1 | |
yield 2 | |
liftIO $ putStr "How many times should I yield? " | |
count <- liftIO readLn | |
replicateM_ count $ yield 10101 | |
yield 3 | |
bar :: [String] | |
bar = snd $ runWriter $ runPipeline $ producer =$= consumer | |
where producer :: Producer String (Writer [String]) () | |
producer = do | |
lift $ tell ["producer: going to yield a"] | |
yield "a" | |
lift $ tell ["producer: yielded a"] | |
lift $ tell ["producer: some stuff after yielding a"] | |
consumer :: Consumer (Maybe String) (Writer [String]) () | |
consumer = do | |
(Just x) <- await | |
lift $ tell ["consumer: got " ++ x] | |
lift $ tell ["consumer: doing some stuff"] | |
-- File IO | |
cat :: IO () | |
cat = runPipeline $ handleToProducer stdin =$= handleToConsumer stdout | |
handleToProducer :: (MonadIO m, Functor m) => Handle -> Producer B.ByteString m () | |
handleToProducer handle = do | |
chunk <- liftIO $ B.hGetSome handle 1024 | |
if chunk == B.empty | |
then return () | |
else yield chunk >> handleToProducer handle | |
handleToConsumer :: (MonadIO m, Functor m) => Handle -> Consumer (Maybe B.ByteString) m () | |
handleToConsumer handle = do | |
c <- await | |
case c of | |
(Just chunk) -> liftIO (B.hPut handle chunk) >> handleToConsumer handle | |
Nothing -> return () |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment