Guest User

Untitled

a guest
Jan 19th, 2018
189
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.12 KB | None | 0 0
  1. import multiprocessing, requests
  2. import copy
  3. import time
  4. import pprint
  5. from enum import Enum
  6. from queue import Queue
  7.  
  8. from datetime import datetime, timedelta
  9. from multiprocessing import Pool
  10.  
  11. class QueueState(Enum):
  12. Inactive = 0
  13. Sleeping = 1
  14. Processing = 2
  15.  
  16. class DelayQueue:
  17.  
  18. def __init__(self):
  19. self.state = QueueState.Inactive
  20. self.queue = []
  21.  
  22. def run(self):
  23. while True:
  24. currentwork = [item for item in self.queue if item.time <= datetime.now()]
  25. currentwork.sort(key=lambda x: x.time, reverse=False)
  26.  
  27. print(len(currentwork), 'tasks ready for processing')
  28. #Process the currentwork
  29. for i, task in list(enumerate(currentwork)):
  30. if i == 0:
  31. print('Processing currentwork')
  32.  
  33. print('Processing item', i)
  34. if task.process() == True:
  35. currentwork.remove(task)
  36. self.queue.remove(task)
  37.  
  38. print('Processing current tasks complete')
  39. #No work ready yet
  40. if len(currentwork) == 0 and len(self.queue) > 0:
  41. print("No work ready to process")
  42.  
  43. if len(self.queue) == 0:
  44. print('Queue empty closing queue')
  45. self.state = QueueState.Inactive
  46. break
  47. elif len(currentwork) == 0:
  48. self.queue.sort(key=lambda x: x.time, reverse=False)
  49. nexttask = self.queue[0]
  50. print("Getting next task", nexttask)
  51. delta = nexttask.time - datetime.now()
  52. print('Sleeping', delta.total_seconds(), 'seconds')
  53. self.state = QueueState.Sleeping
  54. if delta.total_seconds() > 0:
  55. time.sleep(delta.total_seconds())
  56.  
  57. def add(self, func, delay=0, **kwargs):
  58. self.queue.append(Task(func, datetime.now() + timedelta(milliseconds=delay), **kwargs))
  59. #if self.state == QueueState.Inactive:
  60. #self.run()
  61.  
  62. def remove():
  63. raise NotImplementedError
  64.  
  65. class Task:
  66. def __init__(self, func, time, **kwargs):
  67. print("New task", (func.__name__, time, kwargs))
  68. self.func = func
  69. self.time = time
  70. self.args = kwargs
  71.  
  72. def process(self):
  73. self.func(**self.args)
  74. return True
  75. def __str__(self):
  76. return str((self.func.__name__, self.time, self.args))
  77.  
  78. def get(**kwargs):
  79. url = kwargs['url']
  80. print('Requesting', url)
  81. r = requests.get(url=url)
  82. print("Status Code", r.status_code)
  83.  
  84.  
  85. queue = DelayQueue()
  86.  
  87. queue.add(get, 1500, url='http://www.google.com')
  88. queue.add(get, 4300, url='http://www.yahoo.com')
  89. queue.add(get, 3900, url='http://www.att.com')
  90. queue.add(get, 2500, url='http://www.nbc.com')
  91. queue.add(get, 2450, url='http://www.msn.com')
  92. queue.add(get, 2450, url='http://www.gmail.com')
  93.  
  94. queue.run()
  95.  
  96. New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 51, 843225), {'url': 'http://www.google.com'})
  97. New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 643616), {'url': 'http://www.yahoo.com'})
  98. New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 243917), {'url': 'http://www.att.com'})
  99. New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 844260), {'url': 'http://www.nbc.com'})
  100. New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794472), {'url': 'http://www.msn.com'})
  101. New task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794630), {'url': 'http://www.gmail.com'})
  102. 0 tasks ready for processing
  103. Processing current tasks complete
  104. No work ready to process
  105. Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 51, 843225), {'url': 'http://www.google.com'})
  106. Sleeping 1.498037 seconds
  107. 1 tasks ready for processing
  108. Processing currentwork
  109. Processing item 0
  110. Requesting http://www.google.com
  111. Status Code 200
  112. Processing current tasks complete
  113. No work ready to process
  114. Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 794472), {'url': 'http://www.msn.com'})
  115. Sleeping 0.820134 seconds
  116. 2 tasks ready for processing
  117. Processing currentwork
  118. Processing item 0
  119. Requesting http://www.msn.com
  120. Status Code 200
  121. Processing item 1
  122. Requesting http://www.gmail.com
  123. Status Code 200
  124. Processing current tasks complete
  125. No work ready to process
  126. Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 52, 844260), {'url': 'http://www.nbc.com'})
  127. Sleeping -0.540612 seconds
  128. 1 tasks ready for processing
  129. Processing currentwork
  130. Processing item 0
  131. Requesting http://www.nbc.com
  132. Status Code 200
  133. Processing current tasks complete
  134. No work ready to process
  135. Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 243917), {'url': 'http://www.att.com'})
  136. Sleeping 0.708722 seconds
  137. 1 tasks ready for processing
  138. Processing currentwork
  139. Processing item 0
  140. Requesting http://www.att.com
  141. Status Code 200
  142. Processing current tasks complete
  143. No work ready to process
  144. Getting next task ('get', datetime.datetime(2018, 1, 19, 17, 18, 54, 643616), {'url': 'http://www.yahoo.com'})
  145. Sleeping 0.143595 seconds
  146. 1 tasks ready for processing
  147. Processing currentwork
  148. Processing item 0
  149. Requesting http://www.yahoo.com
  150. Status Code 200
  151. Processing current tasks complete
  152. Queue empty closing queue
Add Comment
Please, Sign In to add comment