Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- from boto import sqs
- from boto.sqs.message import Message
- import threading, logging, time
- CONSUMERS = 30
- PRODUCERS = 5
- MAX_MESAGES = 10
- class Counter(threading.Thread):
- daemon = True
- name = None
- interval = 1
- count = 0
- total = 0
- lock = threading.Lock()
- def __init__(self, *args, **kwargs):
- self.name = kwargs.get('name', None)
- super(Counter, self).__init__(*args, **kwargs)
- def increment(self):
- with self.lock:
- self.count += 1
- self.total += 1
- def value(self):
- with self.lock:
- return self.count
- def run(self):
- start = time.time()
- time.clock()
- elapsed = 0
- while elapsed < self.interval:
- elapsed = time.time() - start
- print('timed:%s:%s'% (self.name, self.count, self.total))
- self.count = 0
- time.sleep(1)
- self.run()
- class Producer(threading.Thread):
- daemon = True
- count = 0
- start_time = time.time()
- def __init__(self, *args, **kwargs):
- # keys should be in your boto config
- self.conn = sqs.connect_to_region("us-west-2")
- self.queue = self.conn.create_queue('test_test_test_deleteme')
- super(Producer, self).__init__(*args, **kwargs)
- def message(self, x):
- self.count = self.count + 1
- message = Message()
- message.set_body('message:%s:%s' % (self.count, x))
- return message
- def bulkMessage(self, x):
- self.count = self.count + 1
- return ('%s%s'%(self.count, x), 'message:%s:%s'%(self.count, x), 0)
- def run(self):
- messages = [(self.bulkMessage(i)) for i in range(0,MAX_MESAGES)]
- self.queue.write_batch(messages)
- end_time = time.time()
- self.run()
- class Consumer(threading.Thread):
- daemon = True
- start_time = time.time()
- def __init__(self, counter, *args, **kwargs):
- # keys should be in your boto config
- self.counter = counter
- self.conn = sqs.connect_to_region("us-west-2")
- self.queue = self.conn.create_queue('test_test_test_deleteme')
- super(Consumer, self).__init__(*args, **kwargs)
- def run(self):
- messages = self.queue.get_messages(
- num_messages=MAX_MESAGES,
- visibility_timeout=10
- )
- for message in messages or []:
- if not message:
- return
- print(message.get_body())
- self.counter.increment()
- if messages:
- self.queue.delete_message_batch(messages)
- print(self.counter.value())
- self.run();
- def main():
- counter = Counter()
- threads = [counter]
- [threads.append(Producer()) for i in range(0,PRODUCERS)]
- [threads.append(Consumer(counter)) for i in range(0,CONSUMERS)]
- for t in threads: t.start()
- time.sleep(100)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement