Guest User

Untitled

a guest
May 29th, 2018
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.76 KB | None | 0 0
  1. import simplejson as json
  2. from copy import deepcopy
  3.  
  4. class FlowExpressionId(object):
  5. """
  6. The FlowExpressionId (fei for short) is an process expression identifier.
  7. Each expression when instantiated gets a unique fei.
  8.  
  9. Feis are also used in workitems, where the fei is the fei of the
  10. [participant] expression that emitted the workitem.
  11.  
  12. Feis can thus indicate the position of a workitem in a process tree.
  13.  
  14. Feis contain four pieces of information :
  15.  
  16. * wfid : workflow instance id, the identifier for the process instance
  17. * sub_wfid : the identifier for the sub process within the main instance
  18. * expid : the expression id, where in the process tree
  19. * engine_id : only relevant in multi engine scenarii (defaults to 'engine')
  20. """
  21.  
  22. CHILD_SEP = '_'
  23.  
  24. def __init__(self, h):
  25. self._h = deepcopy(h)
  26.  
  27. def __getitem__(self, key):
  28. return self._h[key]
  29.  
  30. def expid(self):
  31. return self._h['expid']
  32.  
  33. def wfid(self):
  34. return self._h['wfid']
  35.  
  36. def sub_wfid(self):
  37. return self._h['sub_wfid']
  38.  
  39. def engine_id(self):
  40. return self._h['engine_id']
  41.  
  42. def expid(self):
  43. return self._h['expid']
  44.  
  45. def to_storage_id(self):
  46. return "%s!%s!%s" % (self._h['expid'], self._h['sub_wfid'], self._h['wfid'])
  47.  
  48. def child_id(self):
  49. """
  50. Returns the last number in the expid. For instance, if the expid is
  51. '0_5_7', the child_id will be '7'.
  52. """
  53. try:
  54. return int(self._h.expid.split(CHILD_SEP)[-1])
  55. except ValueError:
  56. return None
  57.  
  58. def direct_child(self, other_fei):
  59.  
  60. for k in [ "sub_wfid", "wfid", "engine_id" ] :
  61. if self._h[k] != other_fei[k] : return False
  62.  
  63. pei = join(CHILD_SEP, reverse(split(CHILD_SEP, other_fei['expid'])))
  64. if pei == self._h['expid']: return True
  65. return False
  66.  
  67. class Workitem(object):
  68. """
  69. A workitem can be thought of an "execution token", but with a payload
  70. (fields).
  71.  
  72. The payload/fields MUST be JSONifiable.
  73. """
  74.  
  75. def __init__(self, msg):
  76. self._h = json.loads(msg)
  77. self._fei = FlowExpressionId(self._h['fei'])
  78.  
  79. def to_h(self):
  80. "Returns the underlying Hash instance."
  81. return self._h
  82.  
  83. def sid(self):
  84. """
  85. The string id for this workitem (something like "0_0!!20100507-wagamama").
  86. Not implemented.
  87. """
  88. return self._fei.to_storage_id()
  89.  
  90.  
  91. def wfid(self):
  92. """
  93. Returns the "workflow instance id" (unique process instance id) of
  94. the process instance which issued this workitem.
  95. """
  96. return self._fei.wfid
  97.  
  98. def fei(self):
  99. "Returns a Ruote::FlowExpressionId instance."
  100. return FlowExpressionId(self._h['fei'])
  101.  
  102.  
  103. def dup(self):
  104. """Returns a complete copy of this workitem."""
  105. return Workitem(self._h)
  106.  
  107. def participant_name(self):
  108. """
  109. The participant for which this item is destined. Will be nil when
  110. the workitem is transiting inside of its process instance (as opposed
  111. to when it's being delivered outside of the engine).
  112. """
  113. return self._h['participant_name']
  114.  
  115. def fields(self):
  116. "Returns the payload, ie the fields hash."
  117. return self._h['fields']
  118.  
  119. def set_fields(self, fields):
  120. """
  121. Sets all the fields in one sweep.
  122. Remember : the fields must be a JSONifiable hash.
  123. """
  124. self._h['fields'] = fields
  125.  
  126. def result(self):
  127. """
  128. A shortcut to the value in the field named __result__
  129.  
  130. This field is used by the if expression for instance to determine
  131. if it should branch to its 'then' or its 'else'.
  132. """
  133. return self.fields()['__result__']
  134.  
  135. def set_result(self, r):
  136. "Sets the value of the 'special' field __result__"
  137. self.fields()['__result__'] = r
  138.  
  139. def dispatch_at(self):
  140. "When was this workitem dispatched ?"
  141. return self.fields()['dispatched_at']
  142.  
  143. def forget(self):
  144. "Is this workitem forgotten? If so no reply is expected."
  145. return self.fields()['params']['forget']
  146.  
  147. def __eq__ (self, other):
  148. "Warning : equality is based on fei and not on payload !"
  149. if isinstance(other, type(self)): return false
  150. return self._h['fei'] == other.h['fei']
  151.  
  152. def __ne__ (self, other):
  153. if (self == other): return True
  154. return False
  155.  
  156. def hash(self):
  157. "Warning : hash is fei's hash."
  158. return hash(self._h['fei'])
  159.  
  160.  
  161. def lookup(self, key, container_lookup=False):
  162. """
  163. For a simple key
  164. workitem.lookup('toto')
  165. is equivalent to
  166. workitem.fields['toto']
  167. but for a complex key
  168. workitem.lookup('toto.address')
  169. is equivalent to
  170. workitem.fields['toto']['address']
  171. """
  172. ref=self._h['fields']
  173. for k in key.split("."):
  174. if not key in ref:
  175. return None
  176. ref = ref[key]
  177. return ref
  178.  
  179. def lf(self, key, container_lookup=False):
  180. "'lf' for 'lookup field'"
  181. return self.lookup(key, container_lookup)
  182.  
  183. def set_field(self, key, value):
  184. """
  185. Like #lookup allows for nested lookups, #set_field can be used
  186. to set sub fields directly.
  187.  
  188. workitem.set_field('customer.address.city', 'Pleasantville')
  189.  
  190. Warning : if the customer and address field and subfield are not present
  191. or are not hashes, set_field will simply create a "customer.address.city"
  192. field and set its value to "Pleasantville".
  193. """
  194.  
  195. ref=self._h['fields']
  196. ks = key.split(".")
  197. last = ks.pop()
  198. for k in ks:
  199. if not k in ref:
  200. ref[k]={}
  201. ref = ref[k]
  202. ref[last] = value
  203.  
  204.  
  205. def timed_out(self):
  206. "Shortcut for wi.fields['__timed_out__']"
  207. return self._h['fields']['__timed_out__']
  208.  
  209. def error(self):
  210. "Shortcut for wi.fields['__error__']"
  211. return self._h['fields']['__error__']
  212.  
  213. def params(self):
  214. """
  215. Shortcut for wi.fields['params']
  216. When a participant is invoked in the process definition as
  217. participant_name :ref => 'toto', :task => 'x"
  218. then when the participant's consume() is executed
  219. workitem.params
  220. contains
  221. { 'ref' => 'toto', 'task' => 'x' }
  222. """
  223. return self._h['fields']['params']
  224.  
  225.  
  226.  
  227.  
  228.  
  229.  
  230.  
  231.  
  232.  
  233. from amqplib import client_0_8 as amqp
  234. from workitem import Workitem, FlowExpressionId
  235. import simplejson as json
  236.  
  237. class RuoteParticipant:
  238.  
  239. def __init__(self, ruote_queue,
  240. amqp_host = "amqpvm", amqp_user = "ruote",
  241. amqp_pass = "ruote", amqp_vhost = "ruote-test"):
  242.  
  243. self._conn = amqp.Connection(host=amqp_host, userid=amqp_user,
  244. password=amqp_pass, virtual_host=amqp_vhost,
  245. insist=False)
  246.  
  247. self._chan = self._conn.channel()
  248.  
  249. self._chan.queue_declare(queue=ruote_queue, durable=True,
  250. exclusive=False, auto_delete=False)
  251.  
  252. self._chan.exchange_declare(exchange="", type="direct", durable=True,
  253. auto_delete=False)
  254.  
  255. self._chan.queue_bind(queue=ruote_queue, exchange="",
  256. routing_key=ruote_queue)
  257.  
  258. self._chan.basic_consume(queue=ruote_queue, no_ack=True,
  259. callback=self.recv_callback, consumer_tag="testtag")
  260.  
  261. # Listener approach:
  262. def recv_callback(self, msg):
  263. print 'Received: ' + msg.body
  264. self.workitem = Workitem(msg.body)
  265. self.consume()
  266. if not self.workitem.forget():
  267. self.reply()
  268.  
  269. def consume():
  270. pass
  271.  
  272. def run(self):
  273. while True:
  274. self._chan.wait()
  275.  
  276. def finish(self):
  277. self._chan.basic_cancel("testtag")
  278. self._chan.close()
  279.  
  280.  
  281. def reply(self):
  282. msg = amqp.Message(json.dumps(self.workitem.to_h()))
  283. # delivery_mode=2 is persistent
  284. msg.properties["delivery_mode"] = 2
  285.  
  286. # Publish the message.
  287. # Notice that this is sent to the anonymous/'' exchange (which is
  288. # different to 'amq.direct') with a routing_key for the queue
  289. self._chan.basic_publish(msg, exchange='', routing_key='ruote_workitems')
  290.  
  291.  
  292. if __name__ == "__main__":
  293. class MyPart(RuoteParticipant):
  294. def consume(self):
  295. wi = self.workitem
  296. print "Got a workitem"
  297. print json.dumps(wi.to_h(), indent=4)
  298. wi.set_field("image.size", 1000)
  299. wi.set_result(True)
  300.  
  301. p = MyPart("sizer")
  302. p.run()
Add Comment
Please, Sign In to add comment