Advertisement
Guest User

Untitled

a guest
Oct 22nd, 2016
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.88 KB | None | 0 0
  1. #!/usr/bin/env python
  2. from boto import sqs
  3. from boto.sqs.message import Message
  4. import threading, logging, time
  5.  
  6.  
  7. CONSUMERS = 30
  8. PRODUCERS = 5
  9. MAX_MESAGES = 10
  10.  
  11.  
  12.  
  13.  
  14. class Counter(threading.Thread):
  15. daemon = True
  16. name = None
  17. interval = 1
  18. count = 0
  19. total = 0
  20. lock = threading.Lock()
  21.  
  22.  
  23.  
  24. def __init__(self, *args, **kwargs):
  25. self.name = kwargs.get('name', None)
  26. super(Counter, self).__init__(*args, **kwargs)
  27.  
  28. def increment(self):
  29. with self.lock:
  30. self.count += 1
  31. self.total += 1
  32.  
  33. def value(self):
  34. with self.lock:
  35. return self.count
  36.  
  37. def run(self):
  38. start = time.time()
  39. time.clock()
  40. elapsed = 0
  41. while elapsed < self.interval:
  42. elapsed = time.time() - start
  43. print('timed:%s:%s'% (self.name, self.count, self.total))
  44. self.count = 0
  45. time.sleep(1)
  46. self.run()
  47.  
  48.  
  49.  
  50.  
  51.  
  52. class Producer(threading.Thread):
  53. daemon = True
  54. count = 0
  55. start_time = time.time()
  56.  
  57.  
  58. def __init__(self, *args, **kwargs):
  59. # keys should be in your boto config
  60. self.conn = sqs.connect_to_region("us-west-2")
  61. self.queue = self.conn.create_queue('test_test_test_deleteme')
  62. super(Producer, self).__init__(*args, **kwargs)
  63.  
  64.  
  65. def message(self, x):
  66. self.count = self.count + 1
  67. message = Message()
  68. message.set_body('message:%s:%s' % (self.count, x))
  69. return message
  70.  
  71. def bulkMessage(self, x):
  72. self.count = self.count + 1
  73. return ('%s%s'%(self.count, x), 'message:%s:%s'%(self.count, x), 0)
  74.  
  75.  
  76. def run(self):
  77. messages = [(self.bulkMessage(i)) for i in range(0,MAX_MESAGES)]
  78. self.queue.write_batch(messages)
  79. end_time = time.time()
  80. self.run()
  81.  
  82.  
  83.  
  84.  
  85.  
  86. class Consumer(threading.Thread):
  87. daemon = True
  88. start_time = time.time()
  89.  
  90.  
  91. def __init__(self, counter, *args, **kwargs):
  92. # keys should be in your boto config
  93. self.counter = counter
  94. self.conn = sqs.connect_to_region("us-west-2")
  95. self.queue = self.conn.create_queue('test_test_test_deleteme')
  96. super(Consumer, self).__init__(*args, **kwargs)
  97.  
  98.  
  99. def run(self):
  100. messages = self.queue.get_messages(
  101. num_messages=MAX_MESAGES,
  102. visibility_timeout=10
  103. )
  104.  
  105. for message in messages or []:
  106. if not message:
  107. return
  108.  
  109. print(message.get_body())
  110. self.counter.increment()
  111.  
  112. if messages:
  113. self.queue.delete_message_batch(messages)
  114.  
  115. print(self.counter.value())
  116. self.run();
  117.  
  118.  
  119.  
  120.  
  121.  
  122. def main():
  123. counter = Counter()
  124.  
  125. threads = [counter]
  126. [threads.append(Producer()) for i in range(0,PRODUCERS)]
  127. [threads.append(Consumer(counter)) for i in range(0,CONSUMERS)]
  128. for t in threads: t.start()
  129.  
  130. time.sleep(100)
  131.  
  132.  
  133.  
  134.  
  135.  
  136. if __name__ == "__main__":
  137. main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement