Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import multiprocessing, requests
- import copy
- import time
- import pprint
- from enum import Enum
- from queue import Queue
- from datetime import datetime, timedelta
- from multiprocessing import Pool
- class QueueState(Enum):
- Inactive = 0
- Sleeping = 1
- Processing = 2
- class DelayQueue:
- def __init__(self):
- self.state = QueueState.Inactive
- self.queue = []
- def run(self):
- while True:
- currentwork = [item for item in self.queue if item.time <= datetime.now()]
- currentwork.sort(key=lambda x: x.time, reverse=False)
- print(len(currentwork), 'tasks ready for processing')
- #Process the currentwork
- for i, task in list(enumerate(currentwork)):
- if i == 0:
- print('Processing currentwork')
- print('Processing item', i)
- if task.process() == True:
- currentwork.remove(task)
- self.queue.remove(task)
- print('Processing current tasks complete')
- #No work ready yet
- if len(currentwork) == 0 and len(self.queue) > 0:
- print("No work ready to process")
- if len(self.queue) == 0:
- print('Queue empty closing queue')
- self.state = QueueState.Inactive
- break
- elif len(currentwork) == 0:
- self.queue.sort(key=lambda x: x.time, reverse=False)
- nexttask = self.queue[0]
- print("Getting next task", nexttask)
- delta = nexttask.time - datetime.now()
- print('Sleeping', delta.total_seconds(), 'seconds')
- self.state = QueueState.Sleeping
- if delta.total_seconds() > 0:
- time.sleep(delta.total_seconds())
- def add(self, func, delay=0, **kwargs):
- self.queue.append(Task(func, datetime.now() + timedelta(milliseconds=delay), **kwargs))
- #if self.state == QueueState.Inactive:
- #self.run()
- def remove():
- raise NotImplementedError
- class Task:
- def __init__(self, func, time, **kwargs):
- print("New task", (func.__name__, time, kwargs))
- self.func = func
- self.time = time
- self.args = kwargs
- def process(self):
- self.func(**self.args)
- return True
- def __str__(self):
- return str((self.func.__name__, self.time, self.args))
- def get(**kwargs):
- url = kwargs['url']
- print('Requesting', url)
- r = requests.get(url=url)
- print("Status Code", r.status_code)
- queue = DelayQueue()
- queue.add(get, 1500, url='http://www.google.com')
- queue.add(get, 4300, url='http://www.yahoo.com')
- queue.add(get, 3900, url='http://www.att.com')
- queue.add(get, 2500, url='http://www.nbc.com')
- queue.add(get, 2450, url='http://www.msn.com')
- queue.add(get, 2450, url='http://www.gmail.com')
- queue.run()
- New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 51, 843225), {'url': 'http://www.google.com'})
- New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 643616), {'url': 'http://www.yahoo.com'})
- New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 243917), {'url': 'http://www.att.com'})
- New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 844260), {'url': 'http://www.nbc.com'})
- New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794472), {'url': 'http://www.msn.com'})
- New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794630), {'url': 'http://www.gmail.com'})
- 0 tasks ready for processing
- Processing current tasks complete
- No work ready to process
- Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 51, 843225), {'url': 'http://www.google.com'})
- Sleeping 1.498037 seconds
- 1 tasks ready for processing
- Processing currentwork
- Processing item 0
- Requesting http://www.google.com
- Status Code 200
- Processing current tasks complete
- No work ready to process
- Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794472), {'url': 'http://www.msn.com'})
- Sleeping 0.820134 seconds
- 2 tasks ready for processing
- Processing currentwork
- Processing item 0
- Requesting http://www.msn.com
- Status Code 200
- Processing item 1
- Requesting http://www.gmail.com
- Status Code 200
- Processing current tasks complete
- No work ready to process
- Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 844260), {'url': 'http://www.nbc.com'})
- Sleeping -0.540612 seconds
- 1 tasks ready for processing
- Processing currentwork
- Processing item 0
- Requesting http://www.nbc.com
- Status Code 200
- Processing current tasks complete
- No work ready to process
- Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 243917), {'url': 'http://www.att.com'})
- Sleeping 0.708722 seconds
- 1 tasks ready for processing
- Processing currentwork
- Processing item 0
- Requesting http://www.att.com
- Status Code 200
- Processing current tasks complete
- No work ready to process
- Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 643616), {'url': 'http://www.yahoo.com'})
- Sleeping 0.143595 seconds
- 1 tasks ready for processing
- Processing currentwork
- Processing item 0
- Requesting http://www.yahoo.com
- Status Code 200
- Processing current tasks complete
- Queue empty closing queue
Add Comment
Please, Sign In to add comment