Guest User

Untitled

a guest
Apr 20th, 2018
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.89 KB | None | 0 0
  1. import time
  2. import os
  3.  
  4.  
  5. class JobDoneError(Exception):
  6. pass
  7.  
  8.  
  9. class Job:
  10. """A file backed job, completed steps are commited to disk so it's resumable"""
  11.  
  12. def __init__(self, job_id, steps, filestore):
  13. self._job_id = job_id
  14. self._steps = steps
  15. self._offset = 0
  16. self._filestore = filestore
  17. self._rebuild_from_disk()
  18.  
  19. def run(self):
  20. while True:
  21. try:
  22. self._run_next_step()
  23. except JobDoneError:
  24. print('Job complete!')
  25. break
  26.  
  27. def _run_next_step(self):
  28. current_step = self._dequeue_step()
  29. current_step.run()
  30. self._commit_current_step()
  31.  
  32. def _dequeue_step(self):
  33. if self._offset >= len(self._steps):
  34. raise JobDoneError()
  35. return self._steps[self._offset]
  36.  
  37. def _commit_current_step(self):
  38. next_offset = self._offset + 1
  39. with open(self._filestore, 'a') as fout:
  40. fout.write('{}:{}\n'.format(self._job_id, next_offset))
  41. fout.flush()
  42. self._offset = next_offset
  43.  
  44. def _rebuild_from_disk(self):
  45. # Ensure file exists
  46. if not os.path.exists(self._filestore):
  47. with open(self._filestore, 'w'):
  48. pass
  49. with open(self._filestore) as fin:
  50. for line in fin:
  51. # Ignore empty lines
  52. if len(line.strip()) == 0:
  53. continue
  54. job_id, offset = line.split(':')
  55. if job_id == self._job_id:
  56. self._offset = int(offset)
  57.  
  58.  
  59. class Step:
  60. def __init__(self, name):
  61. self._name = name
  62.  
  63. def run(self):
  64. print(self._name)
  65. time.sleep(1)
  66.  
  67.  
  68. step1 = Step('trigger query 1')
  69. step2 = Step('trigger query 2')
  70. step3 = Step('transfer results to store')
  71. job = Job(job_id='my_awesome_job', steps=[step1, step2, step3], filestore='db.txt')
  72. job.run()
Add Comment
Please, Sign In to add comment