-
-
Save danielwaterworth/1915841 to your computer and use it in GitHub Desktop.
{-# LANGUAGE FlexibleInstances, DeriveDataTypeable #-} | |
import Data.Char | |
import Data.Typeable | |
import Control.Monad | |
import Control.Exception | |
import Control.Monad.Trans | |
import System.IO | |
-- The result of a push or pull operation | |
data Result i e = | |
Err e | | |
Success i | | |
EOF | |
data Step i o e m a = | |
Done a | | |
Pull (Result i e -> Pipe i o e m a) | | |
Push o (Result () e -> Pipe i o e m a) | | |
Error e | |
-- The main pipe type | |
data Pipe i o e m a = Pipe { | |
runPipe :: m (Step i o e m a) | |
} | |
instance Monad m => Monad (Pipe i o e m) where | |
return = Pipe . return . Done | |
m >>= f = Pipe $ do | |
step <- runPipe m | |
case step of | |
Done x -> runPipe $ f x | |
Error e -> runPipe $ pipeErr e | |
Pull c -> return $ Pull (\i -> c i >>= f) | |
Push o c -> return $ Push o (\i -> c i >>= f) | |
instance MonadTrans (Pipe i o e) where | |
lift m = Pipe $ do | |
v <- m | |
return $ Done v | |
instance MonadIO m => MonadIO (Pipe i o SomeException m) where | |
liftIO m = Pipe $ do | |
v <- liftIO $ try m | |
case v of | |
Right v' -> return $ Done v' | |
Left e -> runPipe $ pipeErr e | |
data EOFError = EOFError deriving (Show, Typeable) | |
instance Exception EOFError | |
-- pull a value | |
pull :: Monad m => Pipe i o SomeException m i | |
pull = do | |
x <- Pipe $ return $ Pull return | |
case x of | |
Err e -> pipeErr e | |
Success i -> return i | |
EOF -> pipeErr $ SomeException EOFError | |
-- push a value | |
push :: Monad m => o -> Pipe i o SomeException m () | |
push o = do | |
x <- Pipe $ return $ Push o return | |
case x of | |
Err e -> pipeErr e | |
Success () -> return () | |
EOF -> pipeErr $ SomeException EOFError | |
-- throw an error | |
pipeErr :: Monad m => e -> Pipe i o e m a | |
pipeErr e = Pipe $ return $ Error e | |
-- catch an error | |
pipeCatch :: Monad m => Pipe i o e m a -> (e -> Pipe i o e m a) -> Pipe i o e m a | |
pipeCatch pipe handler = Pipe $ do | |
step <- runPipe pipe | |
runPipe $ case step of | |
Done a -> return a | |
Error e -> handler e | |
Pull c -> Pipe $ return $ Pull (\i -> pipeCatch (c i) handler) | |
Push o c -> Pipe $ return $ Push o (\i -> pipeCatch (c i) handler) | |
-- create big pipes from small pipes | |
($>) :: Monad m => Pipe i x e m a -> Pipe x o e m b -> Pipe i o e m (a, b) | |
a $> b = Pipe $ do | |
a' <- runPipe a | |
b' <- runPipe b | |
runPipe $ case (a', b') of | |
(_, Push o c) -> do | |
x <- Pipe $ return $ Push o return | |
(Pipe $ return a') $> (c x) | |
(Error e, Error _) -> | |
pipeErr e | |
(Error e, Done _) -> | |
pipeErr e | |
(Error e, Pull c) -> | |
(pipeErr e) $> (c (Err e)) | |
(Pull c, Error e) -> | |
(c (Err e)) $> (pipeErr e) | |
(Push _ c, Error e) -> | |
(c (Err e)) $> (pipeErr e) | |
(Done _, Error e) -> | |
pipeErr e | |
(Push o c, Pull c') -> | |
(c (Success ())) $> (c' (Success o)) | |
(Pull c, _) -> do | |
x <- Pipe $ return $ Pull return | |
(c x) $> (Pipe $ return b') | |
(Done _, Pull c) -> | |
(Pipe $ return a') $> (c EOF) | |
(Push _ c, Done _) -> | |
(c EOF) $> (Pipe $ return b') | |
(Done a, Done b) -> | |
Pipe $ return $ Done (a, b) | |
-- enumerate the characters in a file | |
enumFile :: String -> Pipe () Char SomeException IO () | |
enumFile path = do | |
fd <- liftIO $ openFile path ReadMode | |
(forever $ do | |
c <- liftIO $ hGetChar fd | |
push c) `pipeCatch` (\e -> (liftIO $ hClose fd) >> pipeErr e) | |
-- dump a stream of characters into a file | |
iterFile :: String -> Pipe Char () SomeException IO () | |
iterFile path = do | |
fd <- liftIO $ openFile path WriteMode | |
(forever $ do | |
c <- pull | |
liftIO $ hPutChar fd c) `pipeCatch` (\e -> (liftIO $ hClose fd) >> pipeErr e) | |
-- take a complete pipeline and run it | |
runPipeline :: Monad m => Pipe () () e m a -> m (Either e a) | |
runPipeline pipe = do | |
step <- runPipe pipe | |
case step of | |
Done x -> return $ Right x | |
Error e -> return $ Left e | |
Pull _ -> error "unexpected pull" | |
Push _ _ -> error "unexpected push" | |
-- lift a pure function into a pipe | |
createPipe :: Monad m => (i -> o) -> Pipe i o SomeException m a | |
createPipe fn = | |
forever $ pull >>= (push . fn) | |
-- read the characters from src, capitalize them, then dump them into dst | |
main = runPipeline ((enumFile "src") $> (createPipe toUpper) $> (iterFile "dst")) |
@dagit, This is obviously just an example. In the real world I'd use Text or ByteString. I'd read the file in something like 4kB chunks, transform the data in bulk and write it in bulk too. I did consider adding the ability to "unpull" so that you could pull part of a ByteString for example, but I haven't made a firm decision about it yet.
@danielwaterworth, I think making this example a bit more "real-world" would help people assess how good this solution is. For example, if you changed it to read in chunks but you wanted to process per char how much impact would that have on the surrounding code. Can you get away with only changing enumFile
? What happens if I don't fully process a chunk? Can I "rechunk" the data? (eg., go from 4kb chunks to lines and maintain composability, iteratees try to make that possible)
Edit: fixed some spelling errors
For communicating chunks between coroutines, have a look at the incremental-parser package. It solves the common problem with the chunks, namely that the producer's and the consumer's ideas of chunk boundaries rarely agree. The consumer can specify the grammar of the chunk it's interested in, so there's no need to "unpull" anything.
hGetChar will be a really, really slow way to get data out of a file (similar objection to hPutChar). How would your example change if chunks of chars were read/written instead of single chars?