Last active
August 29, 2015 14:19
-
-
Save aztecrex/0d22e4e78c403d0486f3 to your computer and use it in GitHub Desktop.
Workflow control from event history API demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module FlowSpike where | |
| {- | |
| Model a distributed workflow with event-sourced control state. In | |
| this model, the workflow spec is run every time a decision is requred. | |
| A single run transforms a collection of pending and completed tasks | |
| into a maybe result and collection of tasks to start. | |
| This spike tries to expose the control flow problem and propose | |
| an API. | |
| -} | |
| import Control.Monad | |
| import Control.Monad.State | |
| import Control.Applicative | |
| -- Status: | |
| data FlowStat = FlowStat { | |
| getNextId :: Int, -- supply unique task id | |
| getDispatched :: [Int], -- tasks already dispatched | |
| getCompleted :: [(Int, String)], -- tasks already completed | |
| getSkipped :: [Int], -- skipped (not needed but helps visualize) | |
| getReady :: [(Int, String)] -- result (tasks to dispatch) | |
| } deriving (Show) | |
| -- initialize status with dispatched and completed tasks | |
| initStat :: [Int] -> [(Int,String)] -> FlowStat | |
| initStat ds cs = FlowStat 0 ds cs [] [] | |
| -- retrieve and increment id supply | |
| _incr :: FlowStat -> (Int, FlowStat) | |
| _incr (FlowStat i ds cs ss rs) = (i, FlowStat (i+1) ds cs ss rs) | |
| -- lookup dispatched task | |
| _dispatched :: Int -> FlowStat -> (Bool, FlowStat) | |
| _dispatched idx s = (any (==idx) $ getDispatched s, s) | |
| -- lookup completed task | |
| _complete :: Int -> FlowStat -> (Maybe String, FlowStat) | |
| _complete idx s = (lookup idx $ getCompleted s, s) | |
| -- schedule a new task | |
| _schedule :: Int -> String -> FlowStat -> ((), FlowStat) | |
| _schedule idx t (FlowStat i ds cs ss rs) = | |
| ((), FlowStat i ds cs ss ((idx, t):rs)) | |
| -- skip task that cannot yet be run | |
| _skip :: Int -> FlowStat -> ((), FlowStat) | |
| _skip idx (FlowStat i ds cs ss rs) = ((), FlowStat i ds cs (idx:ss) rs) | |
| -- wrap flow status in a State and define state ops | |
| type Flow = State FlowStat | |
| nextIdx :: Flow Int | |
| nextIdx = state $ _incr | |
| complete :: Int -> Flow (Maybe String) | |
| complete idx = state $ _complete idx | |
| dispatched :: Int -> Flow Bool | |
| dispatched idx = state $ _dispatched idx | |
| schedule :: Int -> String -> Flow () | |
| schedule idx arg = state $ _schedule idx arg | |
| skip :: Int -> Flow () | |
| skip idx = state $ _skip idx | |
| -- Dispatch a task if ready. | |
| dispatch :: Maybe String -> Flow (Maybe String) | |
| dispatch t = | |
| -- always issue an id for the operation so task ids stay in sync | |
| nextIdx >>= (\taskId -> | |
| case t of | |
| Nothing -> -- the argument is pending, skip | |
| skip taskId >> | |
| return Nothing | |
| Just x -> | |
| complete taskId >>= (\completed -> | |
| case completed of | |
| Just r -> -- task was completed, move on | |
| return completed | |
| Nothing -> -- task is not completed, is it dispatched? | |
| dispatched taskId >>= (\pending -> | |
| if pending then | |
| return Nothing -- scheduled, keep waiting | |
| else | |
| schedule taskId x >> -- not scheduled, do so | |
| return Nothing | |
| ) | |
| ) | |
| ) | |
| aif :: Applicative f => f Bool -> f a -> f a -> f a | |
| aif fp fl fr = cond <$> fp <*> fl <*> fr where | |
| cond p l r = if p then l else r | |
| (^++) :: Applicative f => f [a] -> f [a] -> f [a] | |
| (^++) = liftA2 (++) | |
| (^==) :: (Applicative f, Eq a) => f a -> f a -> f Bool | |
| (^==) = liftA2 (==) | |
| apiUsageSample :: Flow ([Maybe String]) | |
| -- Ideally, I would like to avoid unwrapping results or lifting functions into | |
| -- Maybe when working inside the do block. | |
| apiUsageSample = do | |
| a <- dispatch $ pure "argA" | |
| b <- dispatch $ pure "argB" | |
| let r1 = a ^++ b -- !!! any way to simply use ++ ? | |
| -- c <- dispatch $ aif (a ^== pure "Hi") (pure "yes") (pure "no") | |
| -- !!! pure this, lift that | |
| c <- case (a ^== pure "Hi") of | |
| -- !!! bleh. all this unwrapping and re-wrapping has got to go | |
| Just True -> dispatch $ pure "yes" | |
| Just False -> dispatch $ pure "no" | |
| _ -> dispatch Nothing | |
| r2 <- dispatch c | |
| r3 <- dispatch r1 | |
| return [r1,r2,r3] | |
| runSample = runState apiUsageSample | |
| {- | |
| The sample workflow specification above is used to compute a collection | |
| of tasks to be started from collections of start and completion events. | |
| A workflow for the specification is kicked off by providing an empty status | |
| (progress0 below). The specification is run to produce a collection of tasks | |
| to start. Presumably, those tasks are started by the calling application. | |
| Eventually, one or more of the scheduled tasks completes and the application | |
| runs the workflow specification with updated status to determine if more | |
| tasks should be started. This cycle of starting tasks then running the | |
| results through the specification is repeated until a stop condition | |
| is detected. | |
| The progress definitions below represent workflow state at several points | |
| in a workflow's lifetime. | |
| -} | |
| -- nothing dispatched | |
| progress0 = initStat [] [] | |
| -- a and b dispatched, nothing completed | |
| progress1 = initStat [0,1] [] | |
| -- b completed | |
| progress2 = initStat [0,1] [(1," there")] | |
| -- a and b completed (r1 can be computed) | |
| progress3 = initStat [0,1] [(1," there"),(0,"Hi")] | |
| -- a and b completed, r2 and c dispatched | |
| progress4 = initStat [0,1,2,4] [(1," there"),(0,"Hi")] | |
| -- r2 and c completed | |
| progress5 = initStat [0,1,2,4] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK")] | |
| -- r3 dispatched and completed | |
| progress6 = initStat [0,1,2,4,3] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK"), | |
| (3,"Oh boy")] | |
| -- run the workflow specification at each defined state | |
| demo = mapM (print . runSample) [progress0, progress1, progress2, progress3, | |
| progress4, progress5, progress6] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment