Last active
August 29, 2015 14:25
-
-
Save pchiusano/d6820783899bd9badef9 to your computer and use it in GitHub Desktop.
Stream processing API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# Language GADTs #-} | |
module Chain where | |
data Chain k a b where | |
Empty :: Chain k a a | |
Chain :: k a x -> Chain k x b -> Chain k a b |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# Language ExistentialQuantification #-} | |
module Free where | |
import Control.Monad | |
data Free f a | |
= Pure a | |
| Eval (f a) | |
| forall x . Bind (Either (f x) x) (x -> Free f a) | |
instance Functor (Free f) where | |
fmap = liftM | |
instance Applicative (Free f) where | |
pure = return | |
(<*>) = ap | |
instance Monad (Free f) where | |
return = Pure | |
Bind h t >>= f = Bind h ((=<<) f . t) | |
Pure a >>= f = Bind (Right a) f | |
Eval fa >>= f = Bind (Left fa) f | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# Language GADTs #-} | |
{-# Language ScopedTypeVariables #-} | |
module Process where | |
import Chain | |
import Control.Applicative | |
import Control.Monad | |
import Data.Foldable | |
import Data.List | |
import Data.Map (Map) | |
import Free (Free) | |
import qualified Data.Map as Map | |
import qualified Free | |
type Err = String | |
type ID = Int | |
data Process f w where | |
Emits :: [w] -> Process f w | |
Eval :: f w -> Process f w | |
Bind :: Process f w0 -> (w0 -> Process f w) -> Process f w | |
Append :: Process f w -> Process f w -> Process f w | |
Fail :: Err -> Process f w | |
Scope :: (ID -> Process f w) -> Process f w | |
Acquire :: ID -> f (r, f ()) -> Process f r | |
Release :: ID -> Process f w | |
OnError :: Process f w -> (Err -> Process f w) -> Process f w | |
instance Functor (Process f) where | |
fmap = liftM | |
instance Applicative (Process f) where | |
pure = return | |
(<*>) = ap | |
instance Alternative (Process f) where | |
(<|>) = mplus | |
empty = mzero | |
instance Monad (Process f) where | |
return w = Emits [w] | |
(>>=) = Bind | |
instance MonadPlus (Process f) where | |
mzero = Emits [] | |
mplus = Append | |
data P f w1 w2 = P (w1 -> Process f w2) [Process f w1] [Process.Err -> Process f w1] | |
run :: forall f w b . (b -> w -> b) -> b -> Process f w -> Free f (Either Err b) | |
run g z p = go 0 Map.empty p z Empty where | |
cleanup :: Map ID (f ()) -> Free f () | |
cleanup tracked = traverse_ Free.Eval (Map.elems tracked) | |
go :: ID -> Map ID (f ()) -> Process f w0 -> b -> Chain (P f) w0 w -> Free f (Either Err b) | |
go fresh tracked p z k = case p of | |
Fail err -> case k of | |
Empty -> pure (Left err) | |
Chain (P _ _ []) tk -> go fresh tracked (Fail err) z tk | |
Chain (P hk fbs (e:handlers)) tk -> go fresh tracked (e err) z (P hk fbs handlers `Chain` tk) | |
Emits [] -> case k of | |
Empty -> cleanup tracked *> pure (Right z) | |
Chain (P hk fbs handlers) tk -> case fbs of | |
[] -> go fresh tracked empty z tk | |
(hfb:fbs) -> go fresh tracked hfb z (P hk fbs handlers `Chain` tk) | |
Emits w -> case k of | |
Empty -> go fresh tracked empty (foldl' g z w) Empty | |
Chain (P hk fbs handlers) tk -> go fresh tracked (msum $ map hk w) z tk | |
Eval fw -> Free.Eval fw >>= \w -> go fresh tracked (Emits [w]) z k | |
Bind x f -> go fresh tracked x z (P f [] [] `Chain` k) | |
Append p1 p2 -> go fresh tracked p1 z (P pure [p2] [] `Chain` k) | |
Scope s -> go (fresh+1) tracked (s fresh) z k | |
Acquire id res -> Free.Eval res >>= \(r, cleanup) -> | |
go fresh (Map.insert id cleanup tracked) (pure r) z k | |
Release id -> maybe (pure ()) Free.Eval (Map.lookup id tracked) *> | |
go fresh (Map.delete id tracked) empty z k | |
OnError p handler -> go fresh tracked p z (P pure [] [handler] `Chain` k) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# Language GADTs #-} | |
{-# Language ScopedTypeVariables #-} | |
module Pull where | |
import Control.Applicative | |
import Control.Monad | |
import Chain (Chain (Chain)) | |
import Data.Foldable | |
import Data.Set (Set) | |
import Data.List | |
import Process (Process, ID) | |
import qualified Chain | |
import qualified Data.Set as Set | |
import qualified Process as Process | |
data Pull f w r where | |
Pure :: r -> Pull f w r | |
Eval :: f r -> Pull f w r | |
Bind :: Pull f w r0 -> (r0 -> Pull f w r) -> Pull f w r | |
Write :: Process f w -> Pull f w () | |
Scope :: (ID -> Pull f w r) -> Pull f w r | |
Track :: ID -> Pull f w () | |
Acquire :: ID -> f (r, f ()) -> Pull f w r | |
Release :: ID -> Pull f w () | |
Done :: Pull f w r | |
Or :: Pull f w r -> Pull f w r -> Pull f w r | |
Fail :: Process.Err -> Pull f w r | |
OnError :: Pull f w r -> (Process.Err -> Pull f w r) -> Pull f w r | |
instance Functor (Pull f w) where | |
fmap = liftM | |
instance Applicative (Pull f w) where | |
pure = return | |
(<*>) = ap | |
instance Alternative (Pull f w) where | |
(<|>) = mplus | |
empty = mzero | |
instance Monad (Pull f w) where | |
return = Pure | |
(>>=) = Bind | |
instance MonadPlus (Pull f w) where | |
mzero = Done | |
mplus = Or | |
data P f w a b = P (a -> Pull f w b) [Pull f w a] [Process.Err -> Pull f w a] | |
-- run $ await p >>= Step h (Handle t) -> pure $ Emits h <|> t | |
-- == | |
-- p | |
run :: Pull f w r -> Process f w | |
run p = go Set.empty p (Chain (P (const $ pure ()) [] []) Chain.Empty) where | |
cleanup :: Set ID -> Process f w | |
cleanup resources = foldl' (\tl hd -> Process.Release hd <|> tl) empty (toList resources) | |
go :: Set ID -> Pull f w r -> Chain (P f w) r () -> Process f w | |
go tracked p k = case p of | |
Done -> case k of | |
Chain.Empty -> cleanup tracked | |
Chain (P _ [] _) tk -> cleanup tracked | |
Chain (P th (fbh : fbt) errHandlers) tk -> go tracked fbh (Chain (P th fbt errHandlers) tk) | |
Fail err -> case k of | |
Chain.Empty -> cleanup tracked <|> Process.Fail err | |
Chain (P hk fallbacks errHandlers) tk -> case errHandlers of | |
[] -> go tracked (Fail err) tk | |
(eh:errHandlers) -> go tracked (eh err) (P hk fallbacks errHandlers `Chain` tk) | |
Pure r -> case k of | |
Chain.Empty -> cleanup tracked | |
Chain (P hk _ _) tk -> go tracked (hk r) tk | |
OnError p handler -> go tracked p (P pure [] [handler] `Chain` k) | |
Eval fr -> Process.Eval fr >>= \r -> go tracked (pure r) k | |
Bind x f -> go tracked x (Chain (P f [] []) k) | |
Write w -> w <|> go tracked (pure ()) k | |
Scope s -> Process.Scope (\id -> go tracked (s id) k) | |
Track id -> go (Set.insert id tracked) (pure ()) k | |
Acquire id res -> Process.Acquire id res >>= \r -> go tracked (pure r) k | |
Release id -> Process.Release id <|> go (Set.delete id tracked) (pure ()) k | |
Or Done p -> go tracked p k | |
Or p1 p2 -> go tracked p1 (P pure [p2] [] `Chain` k) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# Language GADTs #-} | |
module Streams where | |
import Control.Applicative | |
import Data.Foldable (asum) | |
import Free (Free) | |
import Process (Process) | |
import Pull (Pull) | |
import qualified Free | |
import qualified Process | |
import qualified Pull | |
data Step h t = Step h t | |
newtype Handle f w = Handle (Process f w) | |
await :: Handle f w -> Pull f w2 (Step [w] (Handle f w)) | |
await (Handle p) = go p [] where | |
go :: Process f w -> [Process f w] -> Pull f w2 (Step [w] (Handle f w)) | |
go p tl = case p of | |
Process.Emits [] -> Pull.Done | |
Process.Emits w -> pure $ Step w (Handle $ asum tl) | |
Process.Eval fw -> Pull.Eval fw >>= \w -> pure $ Step [w] (Handle $ asum tl) | |
Process.Bind (Process.Bind x f) g -> go (Process.Bind x (\x -> f x >>= g)) tl | |
Process.Bind x f -> go x [] >>= \(Step hd (Handle p)) -> | |
case hd of | |
[] -> go (p >>= f) tl | |
(wh:wt) -> go (f wh) (((Process.Emits wt >>= f) <|> (p >>= f)) : tl) | |
Process.Append a b -> go a (b:tl) <|> go b tl | |
Process.Fail e -> Pull.Fail e | |
Process.OnError p handler -> Pull.OnError (go p tl) (\err -> go (handler err) tl) | |
Process.Scope s -> Pull.Scope (\i -> go (s i) tl) | |
Process.Release id -> Pull.Release id *> go (asum tl) [] | |
Process.Acquire id res -> do | |
r <- Pull.Acquire id res | |
Pull.Track id | |
pure $ Step [r] (Handle $ asum tl) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment