Created
February 26, 2012 10:21
-
-
Save danielwaterworth/1915841 to your computer and use it in GitHub Desktop.
Error conscious, pure iteratee library (based on pipes)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE FlexibleInstances, DeriveDataTypeable #-} | |
import Data.Char | |
import Data.Typeable | |
import Control.Monad | |
import Control.Exception | |
import Control.Monad.Trans | |
import System.IO | |
-- The result of a push or pull operation | |
data Result i e = | |
Err e | | |
Success i | | |
EOF | |
data Step i o e m a = | |
Done a | | |
Pull (Result i e -> Pipe i o e m a) | | |
Push o (Result () e -> Pipe i o e m a) | | |
Error e | |
-- The main pipe type | |
data Pipe i o e m a = Pipe { | |
runPipe :: m (Step i o e m a) | |
} | |
instance Monad m => Monad (Pipe i o e m) where | |
return = Pipe . return . Done | |
m >>= f = Pipe $ do | |
step <- runPipe m | |
case step of | |
Done x -> runPipe $ f x | |
Error e -> runPipe $ pipeErr e | |
Pull c -> return $ Pull (\i -> c i >>= f) | |
Push o c -> return $ Push o (\i -> c i >>= f) | |
instance MonadTrans (Pipe i o e) where | |
lift m = Pipe $ do | |
v <- m | |
return $ Done v | |
instance MonadIO m => MonadIO (Pipe i o SomeException m) where | |
liftIO m = Pipe $ do | |
v <- liftIO $ try m | |
case v of | |
Right v' -> return $ Done v' | |
Left e -> runPipe $ pipeErr e | |
data EOFError = EOFError deriving (Show, Typeable) | |
instance Exception EOFError | |
-- pull a value | |
pull :: Monad m => Pipe i o SomeException m i | |
pull = do | |
x <- Pipe $ return $ Pull return | |
case x of | |
Err e -> pipeErr e | |
Success i -> return i | |
EOF -> pipeErr $ SomeException EOFError | |
-- push a value | |
push :: Monad m => o -> Pipe i o SomeException m () | |
push o = do | |
x <- Pipe $ return $ Push o return | |
case x of | |
Err e -> pipeErr e | |
Success () -> return () | |
EOF -> pipeErr $ SomeException EOFError | |
-- throw an error | |
pipeErr :: Monad m => e -> Pipe i o e m a | |
pipeErr e = Pipe $ return $ Error e | |
-- catch an error | |
pipeCatch :: Monad m => Pipe i o e m a -> (e -> Pipe i o e m a) -> Pipe i o e m a | |
pipeCatch pipe handler = Pipe $ do | |
step <- runPipe pipe | |
runPipe $ case step of | |
Done a -> return a | |
Error e -> handler e | |
Pull c -> Pipe $ return $ Pull (\i -> pipeCatch (c i) handler) | |
Push o c -> Pipe $ return $ Push o (\i -> pipeCatch (c i) handler) | |
-- create big pipes from small pipes | |
($>) :: Monad m => Pipe i x e m a -> Pipe x o e m b -> Pipe i o e m (a, b) | |
a $> b = Pipe $ do | |
a' <- runPipe a | |
b' <- runPipe b | |
runPipe $ case (a', b') of | |
(_, Push o c) -> do | |
x <- Pipe $ return $ Push o return | |
(Pipe $ return a') $> (c x) | |
(Error e, Error _) -> | |
pipeErr e | |
(Error e, Done _) -> | |
pipeErr e | |
(Error e, Pull c) -> | |
(pipeErr e) $> (c (Err e)) | |
(Pull c, Error e) -> | |
(c (Err e)) $> (pipeErr e) | |
(Push _ c, Error e) -> | |
(c (Err e)) $> (pipeErr e) | |
(Done _, Error e) -> | |
pipeErr e | |
(Push o c, Pull c') -> | |
(c (Success ())) $> (c' (Success o)) | |
(Pull c, _) -> do | |
x <- Pipe $ return $ Pull return | |
(c x) $> (Pipe $ return b') | |
(Done _, Pull c) -> | |
(Pipe $ return a') $> (c EOF) | |
(Push _ c, Done _) -> | |
(c EOF) $> (Pipe $ return b') | |
(Done a, Done b) -> | |
Pipe $ return $ Done (a, b) | |
-- enumerate the characters in a file | |
enumFile :: String -> Pipe () Char SomeException IO () | |
enumFile path = do | |
fd <- liftIO $ openFile path ReadMode | |
(forever $ do | |
c <- liftIO $ hGetChar fd | |
push c) `pipeCatch` (\e -> (liftIO $ hClose fd) >> pipeErr e) | |
-- dump a stream of characters into a file | |
iterFile :: String -> Pipe Char () SomeException IO () | |
iterFile path = do | |
fd <- liftIO $ openFile path WriteMode | |
(forever $ do | |
c <- pull | |
liftIO $ hPutChar fd c) `pipeCatch` (\e -> (liftIO $ hClose fd) >> pipeErr e) | |
-- take a complete pipeline and run it | |
runPipeline :: Monad m => Pipe () () e m a -> m (Either e a) | |
runPipeline pipe = do | |
step <- runPipe pipe | |
case step of | |
Done x -> return $ Right x | |
Error e -> return $ Left e | |
Pull _ -> error "unexpected pull" | |
Push _ _ -> error "unexpected push" | |
-- lift a pure function into a pipe | |
createPipe :: Monad m => (i -> o) -> Pipe i o SomeException m a | |
createPipe fn = | |
forever $ pull >>= (push . fn) | |
-- read the characters from src, capitalize them, then dump them into dst | |
main = runPipeline ((enumFile "src") $> (createPipe toUpper) $> (iterFile "dst")) |
For communicating chunks between coroutines, have a look at the incremental-parser package. It solves the common problem with the chunks, namely that the producer's and the consumer's ideas of chunk boundaries rarely agree. The consumer can specify the grammar of the chunk it's interested in, so there's no need to "unpull" anything.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@danielwaterworth, I think making this example a bit more "real-world" would help people assess how good this solution is. For example, if you changed it to read in chunks but you wanted to process per char how much impact would that have on the surrounding code. Can you get away with only changing
enumFile
? What happens if I don't fully process a chunk? Can I "rechunk" the data? (eg., go from 4kb chunks to lines and maintain composability, iteratees try to make that possible)Edit: fixed some spelling errors