Advertisement
nirajs

job schduler

Jan 29th, 2024
704
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.36 KB | None | 0 0
  1. import heapq
  2. import threading
  3. import time
  4.  
  5. def current_time_ms():
  6.     # Return the current time in milliseconds
  7.     return time.time_ns() // 1_000_000
  8.  
  9. class Scheduler:
  10.     def __init__(self):
  11.         self.closing = False
  12.         self.jobs = []
  13.         self.cx = threading.Condition()  # Initialize the condition object
  14.         self.thread = threading.Thread(target=self.run)  # Start the scheduler thread
  15.         self.thread.start()
  16.  
  17.     def schedule(self, job, delay_ms):
  18.         if self.closing:
  19.             raise ValueError("Scheduler cannot schedule jobs while closing.")
  20.         with self.cx:  # Use 'with' statement for acquiring and releasing the lock
  21.             deadline = current_time_ms() + delay_ms
  22.             heapq.heappush(self.jobs, (deadline, job))
  23.             self.cx.notify()  # Notify the scheduler thread
  24.  
  25.     def run(self):
  26.         while not self.closing:
  27.             with self.cx:
  28.                 if self.jobs:
  29.                     deadline, job = self.jobs[0]
  30.                     now = current_time_ms()
  31.                     if deadline <= now:
  32.                         heapq.heappop(self.jobs)
  33.                         job()  # Execute the job
  34.                     else:
  35.                         self.cx.wait((deadline - now) / 1000)  # Wait for the next job or a new job
  36.                 else:
  37.                     self.cx.wait()  # Wait indefinitely if there are no jobs
  38.  
  39.     def close(self):
  40.         with self.cx:  # Ensure mutual exclusion for setting 'closing' flag
  41.             self.closing = True
  42.             self.cx.notify()  # Wake up the scheduler thread to exit
  43.         self.thread.join()  # Wait for the scheduler thread to finish
  44.  
  45.     def current_time_string(self):
  46.         # Return the current time as a string for logging or debugging
  47.         return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
  48.  
  49. def make_job(job_id):
  50.     def job():
  51.         print(f"{Scheduler().current_time_string()}: Executing job {job_id}")
  52.     return job
  53.  
  54. if __name__ == '__main__':
  55.     scheduler = Scheduler()
  56.     scheduler.schedule(make_job("A"), 3000)  # Schedule job 'A' with a delay of 3000ms
  57.     scheduler.schedule(make_job("B"), 2000)  # Adjusted for clarity
  58.     scheduler.schedule(make_job("C"), 10000) # Adjusted for clarity
  59.     time.sleep(15)  # Main thread waits before closing the scheduler
  60.     scheduler.close()  # Close the scheduler
  61.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement