Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import os
- class JobDoneError(Exception):
- pass
- class Job:
- """A file backed job, completed steps are commited to disk so it's resumable"""
- def __init__(self, job_id, steps, filestore):
- self._job_id = job_id
- self._steps = steps
- self._offset = 0
- self._filestore = filestore
- self._rebuild_from_disk()
- def run(self):
- while True:
- try:
- self._run_next_step()
- except JobDoneError:
- print('Job complete!')
- break
- def _run_next_step(self):
- current_step = self._dequeue_step()
- current_step.run()
- self._commit_current_step()
- def _dequeue_step(self):
- if self._offset >= len(self._steps):
- raise JobDoneError()
- return self._steps[self._offset]
- def _commit_current_step(self):
- next_offset = self._offset + 1
- with open(self._filestore, 'a') as fout:
- fout.write('{}:{}\n'.format(self._job_id, next_offset))
- fout.flush()
- self._offset = next_offset
- def _rebuild_from_disk(self):
- # Ensure file exists
- if not os.path.exists(self._filestore):
- with open(self._filestore, 'w'):
- pass
- with open(self._filestore) as fin:
- for line in fin:
- # Ignore empty lines
- if len(line.strip()) == 0:
- continue
- job_id, offset = line.split(':')
- if job_id == self._job_id:
- self._offset = int(offset)
- class Step:
- def __init__(self, name):
- self._name = name
- def run(self):
- print(self._name)
- time.sleep(1)
- step1 = Step('trigger query 1')
- step2 = Step('trigger query 2')
- step3 = Step('transfer results to store')
- job = Job(job_id='my_awesome_job', steps=[step1, step2, step3], filestore='db.txt')
- job.run()
Add Comment
Please, Sign In to add comment