Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module FlowSpike where
- {-
- Model a distributed workflow with event-sourced control state. In
- this model, the workflow spec is run every time a decision is requred.
- A single run transforms a collection of pending and completed tasks
- into a maybe result and collection of tasks to start.
- This spike tries to expose the control flow problem and propose
- an API.
- -}
- import Control.Monad
- import Control.Monad.State
- import Control.Applicative
- -- Status:
- data FlowStat = FlowStat {
- getNextId :: Int, -- supply unique task id
- getDispatched :: [Int], -- tasks already dispatched
- getCompleted :: [(Int, String)], -- tasks already completed
- getSkipped :: [Int], -- skipped (not needed but helps visualize)
- getReady :: [(Int, String)] -- result (tasks to dispatch)
- } deriving (Show)
- -- initialize status with dispatched and completed tasks
- initStat :: [Int] -> [(Int,String)] -> FlowStat
- initStat ds cs = FlowStat 0 ds cs [] []
- -- retrieve and increment id supply
- _incr :: FlowStat -> (Int, FlowStat)
- _incr (FlowStat i ds cs ss rs) = (i, FlowStat (i+1) ds cs ss rs)
- -- lookup dispatched task
- _dispatched :: Int -> FlowStat -> (Bool, FlowStat)
- _dispatched idx s = (any (==idx) $ getDispatched s, s)
- -- lookup completed task
- _complete :: Int -> FlowStat -> (Maybe String, FlowStat)
- _complete idx s = (lookup idx $ getCompleted s, s)
- -- schedule a new task
- _schedule :: Int -> String -> FlowStat -> ((), FlowStat)
- _schedule idx t (FlowStat i ds cs ss rs) =
- ((), FlowStat i ds cs ss ((idx, t):rs))
- -- skip task that cannot yet be run
- _skip :: Int -> FlowStat -> ((), FlowStat)
- _skip idx (FlowStat i ds cs ss rs) = ((), FlowStat i ds cs (idx:ss) rs)
- -- wrap flow status in a State and define state ops
- type Flow = State FlowStat
- nextIdx :: Flow Int
- nextIdx = state $ _incr
- complete :: Int -> Flow (Maybe String)
- complete idx = state $ _complete idx
- dispatched :: Int -> Flow Bool
- dispatched idx = state $ _dispatched idx
- schedule :: Int -> String -> Flow ()
- schedule idx arg = state $ _schedule idx arg
- skip :: Int -> Flow ()
- skip idx = state $ _skip idx
- -- Dispatch a task if ready.
- dispatch :: Maybe String -> Flow (Maybe String)
- dispatch t =
- -- always issue an id for the operation so task ids stay in sync
- nextIdx >>= (\taskId ->
- case t of
- Nothing -> -- the argument is pending, skip
- skip taskId >>
- return Nothing
- Just x ->
- complete taskId >>= (\completed ->
- case completed of
- Just r -> -- task was completed, move on
- return completed
- Nothing -> -- task is not completed, is it dispatched?
- dispatched taskId >>= (\pending ->
- if pending then
- return Nothing -- scheduled, keep waiting
- else
- schedule taskId x >> -- not scheduled, do so
- return Nothing
- )
- )
- )
- aif :: Applicative f => f Bool -> f a -> f a -> f a
- aif fp fl fr = cond <$> fp <*> fl <*> fr where
- cond p l r = if p then l else r
- (^++) :: Applicative f => f [a] -> f [a] -> f [a]
- (^++) = liftA2 (++)
- (^==) :: (Applicative f, Eq a) => f a -> f a -> f Bool
- (^==) = liftA2 (==)
- apiUsageSample :: Flow ([Maybe String])
- -- Ideally, I would like to avoid unwrapping results or lifting functions into
- -- Maybe when working inside the do block.
- apiUsageSample = do
- a <- dispatch $ pure "argA"
- b <- dispatch $ pure "argB"
- let r1 = a ^++ b -- !!! any way to simply use ++ ?
- -- c <- dispatch $ aif (a ^== pure "Hi") (pure "yes") (pure "no")
- -- !!! pure this, lift that
- c <- case (a ^== pure "Hi") of
- -- !!! bleh. all this unwrapping and re-wrapping has got to go
- Just True -> dispatch $ pure "yes"
- Just False -> dispatch $ pure "no"
- _ -> dispatch Nothing
- r2 <- dispatch c
- r3 <- dispatch r1
- return [r1,r2,r3]
- runSample = runState apiUsageSample
- {-
- The sample workflow specification above is used to compute a collection
- of tasks to be started from collections of start and completion events.
- A workflow for the specification is kicked off by providing an empty status
- (progress0 below). The specification is run to produce a collection of tasks
- to start. Presumably, those tasks are started by the calling application.
- Eventually, one or more of the scheduled tasks completes and the application
- runs the workflow specification with updated status to determine if more
- tasks should be started. This cycle of starting tasks then running the
- results through the specification is repeated until a stop condition
- is detected.
- The progress definitions below represent workflow state at several points
- in a workflow's lifetime.
- -}
- -- nothing dispatched
- progress0 = initStat [] []
- -- a and b dispatched, nothing completed
- progress1 = initStat [0,1] []
- -- b completed
- progress2 = initStat [0,1] [(1," there")]
- -- a and b completed (r1 can be computed)
- progress3 = initStat [0,1] [(1," there"),(0,"Hi")]
- -- a and b completed, r2 and c dispatched
- progress4 = initStat [0,1,2,4] [(1," there"),(0,"Hi")]
- -- r2 and c completed
- progress5 = initStat [0,1,2,4] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK")]
- -- r3 dispatched and completed
- progress6 = initStat [0,1,2,4,3] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK"),
- (3,"Oh boy")]
- -- run the workflow specification at each defined state
- demo = mapM (print . runSample) [progress0, progress1, progress2, progress3,
- progress4, progress5, progress6]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement