Guest User

Untitled

a guest
Jul 21st, 2018
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.36 KB | None | 0 0
  1. import logging
  2. import simplejson
  3. from twisted.internet import reactor, defer
  4. from stompest.async import StompConfig, StompCreator
  5.  
  6. class IncrementTransformer(object):
  7.  
  8. IN_QUEUE = '/queue/testIn'
  9. OUT_QUEUE = '/queue/testOut'
  10. ERROR_QUEUE = '/queue/testTransformerError'
  11.  
  12. def __init__(self, config=None):
  13. if config is None:
  14. config = StompConfig('localhost', 61613)
  15. self.config = config
  16.  
  17. @defer.inlineCallbacks
  18. def run(self):
  19. #Establish connection
  20. stomp = yield StompCreator(self.config).getConnection()
  21. #Subscribe to inbound queue
  22. headers = {
  23. #client-individual mode is only supported in AMQ >= 5.2 but necessary for concurrent processing
  24. 'ack': 'client-individual',
  25. #this is the maximum messages the broker will let you work on at the same time
  26. 'activemq.prefetchSize': 100,
  27. }
  28. stomp.subscribe(self.IN_QUEUE, self.addOne, headers, errorDestination=self.ERROR_QUEUE)
  29.  
  30. def addOne(self, stomp, frame):
  31. """
  32. NOTE: you can return a Deferred here
  33. """
  34. data = simplejson.loads(frame['body'])
  35. data['count'] += 1
  36. stomp.send(self.OUT_QUEUE, simplejson.dumps(data))
  37.  
  38. if __name__ == '__main__':
  39. logging.basicConfig(level=logging.DEBUG)
  40. IncrementTransformer().run()
  41. reactor.run()
Add Comment
Please, Sign In to add comment