Advertisement
Viraax

Untitled

Mar 26th, 2023
447
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.74 KB | None | 0 0
  1. import sys
  2. import time
  3. import math
  4. import signal
  5. import traceback
  6. import collections
  7.  
  8. from tqdm import tqdm
  9. from queue import Queue, Empty, Full
  10. from threading import Thread, Lock, Event
  11. from datetime import timedelta
  12.  
  13. class Parallelize:
  14.     def __init__(self, options={}):
  15.         self.nb_threads = options.get('nb_threads', 16)
  16.         self.batch_size = options.get('batch_size', 32)
  17.         self.progress_size = options.get('progress_size', 20)
  18.         self.show_progress = options.get('show_progress', True)
  19.         self.use_tqdm = options.get('use_tqdm', True)
  20.         self.chars = [" ", "▏", "▎", "▍", "▌", "▋", "▊", "▉"]
  21.         self.lock = Lock()
  22.         self.queue = Queue()
  23.         self.event = Event()
  24.         self.threads = []
  25.         self.per_sec = 0
  26.         self.nb_done = 0
  27.         self.nb_error = 0
  28.         self.nb_total = 0
  29.         self.start_time = 0
  30.         self.last_time = 0
  31.         self.eta_last_time = 0
  32.         self.old_nb_done = 0
  33.         self.rate = 0
  34.         self.end_callback = None
  35.         self.end_args = {}
  36.         self.running = False
  37.         self.progress_thread = None
  38.         self.eta_ring = collections.deque([0], maxlen=self.batch_size)
  39.         self.rate_ring = collections.deque([0], maxlen=self.batch_size)
  40.         self.bar_format = '{l_bar}{bar:30}{r_bar}{bar:-10b}'
  41.        
  42.     def task_handler(self, event, callback, args):
  43.         batches = []
  44.  
  45.         while not event.is_set():
  46.             try:
  47.                 job = self.queue.get_nowait()
  48.  
  49.                 if (len(batches) >= self.batch_size):
  50.                     callback(batches, args)
  51.                     batches = []
  52.                     self.queue.task_done()
  53.                 else:
  54.                     batches.append(job)
  55.  
  56.                 with self.lock:
  57.                     self.nb_done += 1
  58.                     if self.use_tqdm:
  59.                         self.tqdm.update(1)
  60.  
  61.             except Empty:
  62.                 with self.lock:
  63.                     if self.nb_done + self.nb_error == self.nb_total and self.running != False:
  64.                         self.running = False
  65.                         event.set()
  66.                         self.tqdm.close()
  67.                         if self.end_callback:
  68.                             self.end_callback(self.end_args)
  69.  
  70.             except Exception as error:
  71.                 with self.lock:
  72.                     # print(traceback.format_exc())
  73.                     # print(error)
  74.                     self.nb_error += 1            
  75.                     if self.use_tqdm:
  76.                         self.tqdm.update(1)
  77.  
  78.             time.sleep(0.1)
  79.  
  80.     def signal_handler(self, sig, frame):
  81.         self.event.set()
  82.         for thread in self.threads:
  83.             thread.join()
  84.         exit()
  85.  
  86.     def progress_handler(self, event):
  87.         while not event.is_set() and self.running:
  88.             now = time.time()
  89.  
  90.             if now - self.last_time >= 1:
  91.                 self.rate_ring.append(self.nb_done - self.old_nb_done)
  92.                 self.last_time = now
  93.                 self.old_nb_done = self.nb_done
  94.  
  95.             self.rate = f'{sum(self.rate_ring) / len(self.rate_ring):.1f}'
  96.  
  97.             if self.running and self.show_progress and self.nb_done > 0 and not self.use_tqdm:
  98.                 self.draw_progress()
  99.  
  100.     def start(self, name, callback, args, total=0, end_callback=None, end_args={}):
  101.         self.task_name = name
  102.         self.per_sec = 0
  103.         self.nb_done = 0
  104.         self.nb_error = 0
  105.         self.nb_total = total
  106.         self.end_callback = end_callback
  107.         self.end_args = end_args
  108.  
  109.         signal.signal(signal.SIGINT, self.signal_handler)
  110.         signal.signal(signal.SIGTERM, self.signal_handler)
  111.  
  112.         self.start_time = time.time()
  113.         self.running = True
  114.  
  115.         progress_thread = Thread(target=self.progress_handler, args=(self.event,))
  116.         progress_thread.daemon = True
  117.         self.threads.append(progress_thread)
  118.         progress_thread.start()
  119.  
  120.         if self.use_tqdm:
  121.             self.tqdm = tqdm(desc=self.task_name, total=total, bar_format=self.bar_format)
  122.  
  123.         for i in range(self.nb_threads):
  124.             thread = Thread(target=self.task_handler, args=(self.event, callback, args))
  125.             thread.daemon = True
  126.             self.threads.append(thread)
  127.             thread.start()
  128.  
  129.         progress_thread.join()
  130.         self.stop()
  131.  
  132.     def stop(self):
  133.         self.event.set()
  134.         for thread in self.threads:
  135.             thread.join()
  136.         self.running = False
  137.  
  138.     def add_jobs(self, jobs):
  139.         try:
  140.             if not isinstance(jobs, list):
  141.                 jobs = [jobs]
  142.  
  143.             for job in jobs:
  144.                 self.queue.put_nowait(job)
  145.         except Full:
  146.             print('The queue is full, can\'t add a new job')
  147.         except Exception as e:
  148.             print(e)
  149.  
  150.     def calculate_eta(self, position, total):
  151.         if position == 0:
  152.             return [0, 0]
  153.  
  154.         now = time.time()
  155.         elapsed = now - self.start_time
  156.  
  157.         if now - self.eta_last_time >= 1:
  158.             self.eta_ring.append(elapsed * total / position)
  159.             self.eta_last_time = now
  160.  
  161.         remaining = round(sum(self.eta_ring) / len(self.eta_ring))
  162.  
  163.         return [elapsed, remaining]
  164.  
  165.     def format_eta(self, seconds):
  166.         return "{:0>8}".format(str(timedelta(seconds=seconds)))
  167.  
  168.     def draw_progress(self):
  169.         total = 1 if self.nb_total == 0 else self.nb_total
  170.         percent_done  = (self.nb_done / total) * 100
  171.         percent_error = (self.nb_error / total) * 100
  172.         percent_total = percent_done + percent_error
  173.         nb_done_and_error = self.nb_done + self.nb_error
  174.         ratio = (percent_total / 100) * self.progress_size
  175.         width = math.floor(ratio)
  176.         empty = self.progress_size - width - 1
  177.  
  178.         slice_char = "" if empty < 0 else self.chars[math.floor((ratio % 1) * len(self.chars))]
  179.         bar   = '[{0}{1}]'.format('█' * width + slice_char, ' ' * empty)
  180.        
  181.         name  = self.task_name
  182.         bar   = '[{0}{1}]'.format('█' * width + slice_char, ' ' * empty)
  183.  
  184.         done  = f'Done\t{self.nb_done} ({percent_done:.2f} %)'
  185.         error = f'Error\t{self.nb_error} ({percent_error:.2f} %)'
  186.         total_display = f'Total\t{nb_done_and_error} / {total} ({percent_total:.2f} %)'
  187.         rate  = f'Rate\t{self.rate} it/s'
  188.  
  189.         elapsed, remaining = self.calculate_eta(nb_done_and_error, total)
  190.         eta = f'Elapsed\t{self.format_eta(round(elapsed))}\nETA\t{self.format_eta(round(remaining - elapsed))}'
  191.  
  192.         if self.nb_done > 0:
  193.             for _ in range(0, 7):
  194.                 sys.stdout.write("\x1b[1A\x1b[2K")
  195.  
  196.         print(f'{name}\n{bar}\n{done}\n{error}\n{total_display}\n{eta}\n{rate}', end='\r')
  197.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement