Advertisement
Guest User

Untitled

a guest
May 27th, 2015
233
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.56 KB | None | 0 0
  1. module FlowSpike where
  2.  
  3. {-
  4. Model a distributed workflow with event-sourced control state. In
  5. this model, the workflow spec is run every time a decision is requred.
  6. A single run transforms a collection of pending and completed tasks
  7. into a maybe result and collection of tasks to start.
  8.  
  9. This spike tries to expose the control flow problem and propose
  10. an API.
  11.  
  12. -}
  13.  
  14. import Control.Monad
  15. import Control.Monad.State
  16. import Control.Applicative
  17.  
  18. -- Status:
  19. data FlowStat = FlowStat {
  20. getNextId :: Int, -- supply unique task id
  21. getDispatched :: [Int], -- tasks already dispatched
  22. getCompleted :: [(Int, String)], -- tasks already completed
  23. getSkipped :: [Int], -- skipped (not needed but helps visualize)
  24. getReady :: [(Int, String)] -- result (tasks to dispatch)
  25. } deriving (Show)
  26.  
  27. -- initialize status with dispatched and completed tasks
  28. initStat :: [Int] -> [(Int,String)] -> FlowStat
  29. initStat ds cs = FlowStat 0 ds cs [] []
  30.  
  31. -- retrieve and increment id supply
  32. _incr :: FlowStat -> (Int, FlowStat)
  33. _incr (FlowStat i ds cs ss rs) = (i, FlowStat (i+1) ds cs ss rs)
  34.  
  35. -- lookup dispatched task
  36. _dispatched :: Int -> FlowStat -> (Bool, FlowStat)
  37. _dispatched idx s = (any (==idx) $ getDispatched s, s)
  38.  
  39. -- lookup completed task
  40. _complete :: Int -> FlowStat -> (Maybe String, FlowStat)
  41. _complete idx s = (lookup idx $ getCompleted s, s)
  42.  
  43. -- schedule a new task
  44. _schedule :: Int -> String -> FlowStat -> ((), FlowStat)
  45. _schedule idx t (FlowStat i ds cs ss rs) =
  46. ((), FlowStat i ds cs ss ((idx, t):rs))
  47.  
  48. -- skip task that cannot yet be run
  49. _skip :: Int -> FlowStat -> ((), FlowStat)
  50. _skip idx (FlowStat i ds cs ss rs) = ((), FlowStat i ds cs (idx:ss) rs)
  51.  
  52.  
  53. -- wrap flow status in a State and define state ops
  54. type Flow = State FlowStat
  55.  
  56. nextIdx :: Flow Int
  57. nextIdx = state $ _incr
  58.  
  59. complete :: Int -> Flow (Maybe String)
  60. complete idx = state $ _complete idx
  61.  
  62. dispatched :: Int -> Flow Bool
  63. dispatched idx = state $ _dispatched idx
  64.  
  65. schedule :: Int -> String -> Flow ()
  66. schedule idx arg = state $ _schedule idx arg
  67.  
  68. skip :: Int -> Flow ()
  69. skip idx = state $ _skip idx
  70.  
  71. -- Dispatch a task if ready.
  72. dispatch :: Maybe String -> Flow (Maybe String)
  73. dispatch t =
  74. -- always issue an id for the operation so task ids stay in sync
  75. nextIdx >>= (\taskId ->
  76. case t of
  77. Nothing -> -- the argument is pending, skip
  78. skip taskId >>
  79. return Nothing
  80. Just x ->
  81. complete taskId >>= (\completed ->
  82. case completed of
  83. Just r -> -- task was completed, move on
  84. return completed
  85. Nothing -> -- task is not completed, is it dispatched?
  86. dispatched taskId >>= (\pending ->
  87. if pending then
  88. return Nothing -- scheduled, keep waiting
  89. else
  90. schedule taskId x >> -- not scheduled, do so
  91. return Nothing
  92. )
  93. )
  94. )
  95.  
  96. aif :: Applicative f => f Bool -> f a -> f a -> f a
  97. aif fp fl fr = cond <$> fp <*> fl <*> fr where
  98. cond p l r = if p then l else r
  99.  
  100. (^++) :: Applicative f => f [a] -> f [a] -> f [a]
  101. (^++) = liftA2 (++)
  102.  
  103. (^==) :: (Applicative f, Eq a) => f a -> f a -> f Bool
  104. (^==) = liftA2 (==)
  105.  
  106. apiUsageSample :: Flow ([Maybe String])
  107. -- Ideally, I would like to avoid unwrapping results or lifting functions into
  108. -- Maybe when working inside the do block.
  109. apiUsageSample = do
  110. a <- dispatch $ pure "argA"
  111. b <- dispatch $ pure "argB"
  112. let r1 = a ^++ b -- !!! any way to simply use ++ ?
  113. -- c <- dispatch $ aif (a ^== pure "Hi") (pure "yes") (pure "no")
  114. -- !!! pure this, lift that
  115. c <- case (a ^== pure "Hi") of
  116. -- !!! bleh. all this unwrapping and re-wrapping has got to go
  117. Just True -> dispatch $ pure "yes"
  118. Just False -> dispatch $ pure "no"
  119. _ -> dispatch Nothing
  120. r2 <- dispatch c
  121. r3 <- dispatch r1
  122. return [r1,r2,r3]
  123.  
  124. runSample = runState apiUsageSample
  125.  
  126. {-
  127.  
  128. The sample workflow specification above is used to compute a collection
  129. of tasks to be started from collections of start and completion events.
  130.  
  131. A workflow for the specification is kicked off by providing an empty status
  132. (progress0 below). The specification is run to produce a collection of tasks
  133. to start. Presumably, those tasks are started by the calling application.
  134.  
  135. Eventually, one or more of the scheduled tasks completes and the application
  136. runs the workflow specification with updated status to determine if more
  137. tasks should be started. This cycle of starting tasks then running the
  138. results through the specification is repeated until a stop condition
  139. is detected.
  140.  
  141. The progress definitions below represent workflow state at several points
  142. in a workflow's lifetime.
  143.  
  144. -}
  145.  
  146. -- nothing dispatched
  147. progress0 = initStat [] []
  148.  
  149. -- a and b dispatched, nothing completed
  150. progress1 = initStat [0,1] []
  151.  
  152. -- b completed
  153. progress2 = initStat [0,1] [(1," there")]
  154.  
  155. -- a and b completed (r1 can be computed)
  156. progress3 = initStat [0,1] [(1," there"),(0,"Hi")]
  157.  
  158. -- a and b completed, r2 and c dispatched
  159. progress4 = initStat [0,1,2,4] [(1," there"),(0,"Hi")]
  160.  
  161. -- r2 and c completed
  162. progress5 = initStat [0,1,2,4] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK")]
  163.  
  164. -- r3 dispatched and completed
  165. progress6 = initStat [0,1,2,4,3] [(1," there"),(0,"Hi"),(4,"OK"),(2,"NACK"),
  166. (3,"Oh boy")]
  167.  
  168. -- run the workflow specification at each defined state
  169. demo = mapM (print . runSample) [progress0, progress1, progress2, progress3,
  170. progress4, progress5, progress6]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement