Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active August 29, 2015 14:25
Show Gist options
  • Save pchiusano/d6820783899bd9badef9 to your computer and use it in GitHub Desktop.
Save pchiusano/d6820783899bd9badef9 to your computer and use it in GitHub Desktop.
Stream processing API
{-# Language GADTs #-}
module Chain where
data Chain k a b where
Empty :: Chain k a a
Chain :: k a x -> Chain k x b -> Chain k a b
{-# Language ExistentialQuantification #-}
module Free where
import Control.Monad
data Free f a
= Pure a
| Eval (f a)
| forall x . Bind (Either (f x) x) (x -> Free f a)
instance Functor (Free f) where
fmap = liftM
instance Applicative (Free f) where
pure = return
(<*>) = ap
instance Monad (Free f) where
return = Pure
Bind h t >>= f = Bind h ((=<<) f . t)
Pure a >>= f = Bind (Right a) f
Eval fa >>= f = Bind (Left fa) f
{-# Language GADTs #-}
{-# Language ScopedTypeVariables #-}
module Process where
import Chain
import Control.Applicative
import Control.Monad
import Data.Foldable
import Data.List
import Data.Map (Map)
import Free (Free)
import qualified Data.Map as Map
import qualified Free
type Err = String
type ID = Int
data Process f w where
Emits :: [w] -> Process f w
Eval :: f w -> Process f w
Bind :: Process f w0 -> (w0 -> Process f w) -> Process f w
Append :: Process f w -> Process f w -> Process f w
Fail :: Err -> Process f w
Scope :: (ID -> Process f w) -> Process f w
Acquire :: ID -> f (r, f ()) -> Process f r
Release :: ID -> Process f w
OnError :: Process f w -> (Err -> Process f w) -> Process f w
instance Functor (Process f) where
fmap = liftM
instance Applicative (Process f) where
pure = return
(<*>) = ap
instance Alternative (Process f) where
(<|>) = mplus
empty = mzero
instance Monad (Process f) where
return w = Emits [w]
(>>=) = Bind
instance MonadPlus (Process f) where
mzero = Emits []
mplus = Append
data P f w1 w2 = P (w1 -> Process f w2) [Process f w1] [Process.Err -> Process f w1]
run :: forall f w b . (b -> w -> b) -> b -> Process f w -> Free f (Either Err b)
run g z p = go 0 Map.empty p z Empty where
cleanup :: Map ID (f ()) -> Free f ()
cleanup tracked = traverse_ Free.Eval (Map.elems tracked)
go :: ID -> Map ID (f ()) -> Process f w0 -> b -> Chain (P f) w0 w -> Free f (Either Err b)
go fresh tracked p z k = case p of
Fail err -> case k of
Empty -> pure (Left err)
Chain (P _ _ []) tk -> go fresh tracked (Fail err) z tk
Chain (P hk fbs (e:handlers)) tk -> go fresh tracked (e err) z (P hk fbs handlers `Chain` tk)
Emits [] -> case k of
Empty -> cleanup tracked *> pure (Right z)
Chain (P hk fbs handlers) tk -> case fbs of
[] -> go fresh tracked empty z tk
(hfb:fbs) -> go fresh tracked hfb z (P hk fbs handlers `Chain` tk)
Emits w -> case k of
Empty -> go fresh tracked empty (foldl' g z w) Empty
Chain (P hk fbs handlers) tk -> go fresh tracked (msum $ map hk w) z tk
Eval fw -> Free.Eval fw >>= \w -> go fresh tracked (Emits [w]) z k
Bind x f -> go fresh tracked x z (P f [] [] `Chain` k)
Append p1 p2 -> go fresh tracked p1 z (P pure [p2] [] `Chain` k)
Scope s -> go (fresh+1) tracked (s fresh) z k
Acquire id res -> Free.Eval res >>= \(r, cleanup) ->
go fresh (Map.insert id cleanup tracked) (pure r) z k
Release id -> maybe (pure ()) Free.Eval (Map.lookup id tracked) *>
go fresh (Map.delete id tracked) empty z k
OnError p handler -> go fresh tracked p z (P pure [] [handler] `Chain` k)
{-# Language GADTs #-}
{-# Language ScopedTypeVariables #-}
module Pull where
import Control.Applicative
import Control.Monad
import Chain (Chain (Chain))
import Data.Foldable
import Data.Set (Set)
import Data.List
import Process (Process, ID)
import qualified Chain
import qualified Data.Set as Set
import qualified Process as Process
data Pull f w r where
Pure :: r -> Pull f w r
Eval :: f r -> Pull f w r
Bind :: Pull f w r0 -> (r0 -> Pull f w r) -> Pull f w r
Write :: Process f w -> Pull f w ()
Scope :: (ID -> Pull f w r) -> Pull f w r
Track :: ID -> Pull f w ()
Acquire :: ID -> f (r, f ()) -> Pull f w r
Release :: ID -> Pull f w ()
Done :: Pull f w r
Or :: Pull f w r -> Pull f w r -> Pull f w r
Fail :: Process.Err -> Pull f w r
OnError :: Pull f w r -> (Process.Err -> Pull f w r) -> Pull f w r
instance Functor (Pull f w) where
fmap = liftM
instance Applicative (Pull f w) where
pure = return
(<*>) = ap
instance Alternative (Pull f w) where
(<|>) = mplus
empty = mzero
instance Monad (Pull f w) where
return = Pure
(>>=) = Bind
instance MonadPlus (Pull f w) where
mzero = Done
mplus = Or
data P f w a b = P (a -> Pull f w b) [Pull f w a] [Process.Err -> Pull f w a]
-- run $ await p >>= Step h (Handle t) -> pure $ Emits h <|> t
-- ==
-- p
run :: Pull f w r -> Process f w
run p = go Set.empty p (Chain (P (const $ pure ()) [] []) Chain.Empty) where
cleanup :: Set ID -> Process f w
cleanup resources = foldl' (\tl hd -> Process.Release hd <|> tl) empty (toList resources)
go :: Set ID -> Pull f w r -> Chain (P f w) r () -> Process f w
go tracked p k = case p of
Done -> case k of
Chain.Empty -> cleanup tracked
Chain (P _ [] _) tk -> cleanup tracked
Chain (P th (fbh : fbt) errHandlers) tk -> go tracked fbh (Chain (P th fbt errHandlers) tk)
Fail err -> case k of
Chain.Empty -> cleanup tracked <|> Process.Fail err
Chain (P hk fallbacks errHandlers) tk -> case errHandlers of
[] -> go tracked (Fail err) tk
(eh:errHandlers) -> go tracked (eh err) (P hk fallbacks errHandlers `Chain` tk)
Pure r -> case k of
Chain.Empty -> cleanup tracked
Chain (P hk _ _) tk -> go tracked (hk r) tk
OnError p handler -> go tracked p (P pure [] [handler] `Chain` k)
Eval fr -> Process.Eval fr >>= \r -> go tracked (pure r) k
Bind x f -> go tracked x (Chain (P f [] []) k)
Write w -> w <|> go tracked (pure ()) k
Scope s -> Process.Scope (\id -> go tracked (s id) k)
Track id -> go (Set.insert id tracked) (pure ()) k
Acquire id res -> Process.Acquire id res >>= \r -> go tracked (pure r) k
Release id -> Process.Release id <|> go (Set.delete id tracked) (pure ()) k
Or Done p -> go tracked p k
Or p1 p2 -> go tracked p1 (P pure [p2] [] `Chain` k)
{-# Language GADTs #-}
module Streams where
import Control.Applicative
import Data.Foldable (asum)
import Free (Free)
import Process (Process)
import Pull (Pull)
import qualified Free
import qualified Process
import qualified Pull
data Step h t = Step h t
newtype Handle f w = Handle (Process f w)
await :: Handle f w -> Pull f w2 (Step [w] (Handle f w))
await (Handle p) = go p [] where
go :: Process f w -> [Process f w] -> Pull f w2 (Step [w] (Handle f w))
go p tl = case p of
Process.Emits [] -> Pull.Done
Process.Emits w -> pure $ Step w (Handle $ asum tl)
Process.Eval fw -> Pull.Eval fw >>= \w -> pure $ Step [w] (Handle $ asum tl)
Process.Bind (Process.Bind x f) g -> go (Process.Bind x (\x -> f x >>= g)) tl
Process.Bind x f -> go x [] >>= \(Step hd (Handle p)) ->
case hd of
[] -> go (p >>= f) tl
(wh:wt) -> go (f wh) (((Process.Emits wt >>= f) <|> (p >>= f)) : tl)
Process.Append a b -> go a (b:tl) <|> go b tl
Process.Fail e -> Pull.Fail e
Process.OnError p handler -> Pull.OnError (go p tl) (\err -> go (handler err) tl)
Process.Scope s -> Pull.Scope (\i -> go (s i) tl)
Process.Release id -> Pull.Release id *> go (asum tl) []
Process.Acquire id res -> do
r <- Pull.Acquire id res
Pull.Track id
pure $ Step [r] (Handle $ asum tl)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment