Skip to content

Instantly share code, notes, and snippets.

@aztecrex
Last active August 29, 2015 14:19
Show Gist options
  • Select an option

  • Save aztecrex/0d22e4e78c403d0486f3 to your computer and use it in GitHub Desktop.

Select an option

Save aztecrex/0d22e4e78c403d0486f3 to your computer and use it in GitHub Desktop.
Workflow control from event history API demo
module FlowSpike where
{-
Model a distributed workflow with event-sourced control state. In
this model, the workflow spec is run every time a decision is requred.
A single run transforms a collection of pending and completed tasks
into a maybe result and collection of tasks to start.
This spike tries to expose the control flow problem and propose
an API.
-}
import Control.Monad
import Control.Monad.State
import Control.Applicative
-- Status:
data FlowStat = FlowStat {
getNextId :: Int, -- supply unique task id
getDispatched :: [Int], -- tasks already dispatched
getCompleted :: [(Int, String)], -- tasks already completed
getSkipped :: [Int], -- skipped (not needed but helps visualize)
getReady :: [(Int, String)] -- result (tasks to dispatch)
} deriving (Show)
-- initialize status with dispatched and completed tasks
initStat :: [Int] -> [(Int,String)] -> FlowStat
initStat ds cs = FlowStat 0 ds cs [] []
-- retrieve and increment id supply
_incr :: FlowStat -> (Int, FlowStat)
_incr (FlowStat i ds cs ss rs) = (i, FlowStat (i+1) ds cs ss rs)
-- lookup dispatched task
_dispatched :: Int -> FlowStat -> (Bool, FlowStat)
_dispatched idx s = (any (==idx) $ getDispatched s, s)
-- lookup completed task
_complete :: Int -> FlowStat -> (Maybe String, FlowStat)
_complete idx s = (lookup idx $ getCompleted s, s)
-- schedule a new task
_schedule :: Int -> String -> FlowStat -> ((), FlowStat)
_schedule idx t (FlowStat i ds cs ss rs) =
((), FlowStat i ds cs ss ((idx, t):rs))
-- skip task that cannot yet be run
_skip :: Int -> FlowStat -> ((), FlowStat)
_skip idx (FlowStat i ds cs ss rs) = ((), FlowStat i ds cs (idx:ss) rs)
-- wrap flow status in a State and define state ops
type Flow = State FlowStat
nextIdx :: Flow Int
nextIdx = state $ _incr
complete :: Int -> Flow (Maybe String)
complete idx = state $ _complete idx
dispatched :: Int -> Flow Bool
dispatched idx = state $ _dispatched idx
schedule :: Int -> String -> Flow ()
schedule idx arg = state $ _schedule idx arg
skip :: Int -> Flow ()
skip idx = state $ _skip idx
-- Dispatch a task if ready.
dispatch :: Maybe String -> Flow (Maybe String)
dispatch t =
-- always issue an id for the operation so task ids stay in sync
nextIdx >>= (\taskId ->
case t of
Nothing -> -- the argument is pending, skip
skip taskId >>
return Nothing
Just x ->
complete taskId >>= (\completed ->
case completed of
Just r -> -- task was completed, move on
return completed
Nothing -> -- task is not completed, is it dispatched?
dispatched taskId >>= (\pending ->
if pending then
return Nothing -- scheduled, keep waiting
else
schedule taskId x >> -- not scheduled, do so
return Nothing
)
)
)
aif :: Applicative f => f Bool -> f a -> f a -> f a
aif fp fl fr = cond <$> fp <*> fl <*> fr where
cond p l r = if p then l else r
(^++) :: Applicative f => f [a] -> f [a] -> f [a]
(^++) = liftA2 (++)
(^==) :: (Applicative f, Eq a) => f a -> f a -> f Bool
(^==) = liftA2 (==)
apiUsageSample :: Flow ([Maybe String])
-- Ideally, I would like to avoid unwrapping results or lifting functions into
-- Maybe when working inside the do block.
apiUsageSample = do
a <- dispatch $ pure "argA"
b <- dispatch $ pure "argB"
let r1 = a ^++ b -- !!! any way to simply use ++ ?
-- c <- dispatch $ aif (a ^== pure "Hi") (pure "yes") (pure "no")
-- !!! pure this, lift that
c <- case (a ^== pure "Hi") of
-- !!! bleh. all this unwrapping and re-wrapping has got to go
Just True -> dispatch $ pure "yes"
Just False -> dispatch $ pure "no"
_ -> dispatch Nothing
r2 <- dispatch c
r3 <- dispatch r1
return [r1,r2,r3]
runSample = runState apiUsageSample
{-
The sample workflow specification above is used to compute a collection
of tasks to be started from collections of start and completion events.
A workflow for the specification is kicked off by providing an empty status
(progress0 below). The specification is run to produce a collection of tasks
to start. Presumably, those tasks are started by the calling application.
Eventually, one or more of the scheduled tasks completes and the application
runs the workflow specification with updated status to determine if more
tasks should be started. This cycle of starting tasks then running the
results through the specification is repeated until a stop condition
is detected.
The progress definitions below represent workflow state at several points
in a workflow's lifetime.
-}
-- nothing dispatched
progress0 = initStat [] []
-- a and b dispatched, nothing completed
progress1 = initStat [0,1] []
-- b completed
progress2 = initStat [0,1] [(1," there")]
-- a and b completed (r1 can be computed)
progress3 = initStat [0,1] [(1," there"),(0,"Hi")]
-- a and b completed, r2 and c dispatched
progress4 = initStat [0,1,2,4] [(1," there"),(0,"Hi")]
-- r2 and c completed
progress5 = initStat [0,1,2,4] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK")]
-- r3 dispatched and completed
progress6 = initStat [0,1,2,4,3] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK"),
(3,"Oh boy")]
-- run the workflow specification at each defined state
demo = mapM (print . runSample) [progress0, progress1, progress2, progress3,
progress4, progress5, progress6]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment