Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from twisted.internet.task import LoopingCall
- class Manager:
- def __init__(self):
- self._job2loop = {}
- self._jobDone = {}
- def register(self, task):
- self._job2loop[task.name] = LoopingCall(task.job())
- self._jobDone[task.name] = task.job_done
- def start(self, schedule):
- if not type(schedule) is list:
- schedule = [schedule]
- for task, timer in schedule:
- if task.name in self._job2loop:
- self._job2loop[task.name].start(timer, False)
- else:
- print 'Job has not scheduled: %s' % task.name
- def check_end(self, task):
- if self._jobDone[task.name]():
- self._job2loop[task.name].stop()
- del self._job2loop[task.name]
- if not self._job2loop:
- reactor.stop()
- class Countdown(object):
- def __init__(self, name, start):
- self.name = name
- self.counter = start
- def register(self, manager):
- self.manager = manager
- manager.register(self)
- def job_done(self):
- if not self.counter:
- return True
- return False
- def job(self):
- return self.count
- def count(self, finallize=lambda x: None):
- print 'counter %s:' % str(self.name), self.counter, '...'
- self.manager.check_end(self)
- self.counter -= 1
- mng = Manager()
- cntA = Countdown('AAA', 5)
- cntB = Countdown('BB', 10)
- cntC = Countdown('C', 7)
- cntA.register(mng)
- cntB.register(mng)
- cntC.register(mng)
- mng.start([(cntA, 1), (cntB, 0.3)])
- mng.start((cntC, 0.5))
- from twisted.internet import reactor
- reactor.run()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement