Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import time
- import math
- import signal
- import traceback
- import collections
- from tqdm import tqdm
- from queue import Queue, Empty, Full
- from threading import Thread, Lock, Event
- from datetime import timedelta
- class Parallelize:
- def __init__(self, options={}):
- self.nb_threads = options.get('nb_threads', 16)
- self.batch_size = options.get('batch_size', 32)
- self.progress_size = options.get('progress_size', 20)
- self.show_progress = options.get('show_progress', True)
- self.use_tqdm = options.get('use_tqdm', True)
- self.chars = [" ", "▏", "▎", "▍", "▌", "▋", "▊", "▉"]
- self.lock = Lock()
- self.queue = Queue()
- self.event = Event()
- self.threads = []
- self.per_sec = 0
- self.nb_done = 0
- self.nb_error = 0
- self.nb_total = 0
- self.start_time = 0
- self.last_time = 0
- self.eta_last_time = 0
- self.old_nb_done = 0
- self.rate = 0
- self.end_callback = None
- self.end_args = {}
- self.running = False
- self.progress_thread = None
- self.eta_ring = collections.deque([0], maxlen=self.batch_size)
- self.rate_ring = collections.deque([0], maxlen=self.batch_size)
- self.bar_format = '{l_bar}{bar:30}{r_bar}{bar:-10b}'
- def task_handler(self, event, callback, args):
- batches = []
- while not event.is_set():
- try:
- job = self.queue.get_nowait()
- if (len(batches) >= self.batch_size):
- callback(batches, args)
- batches = []
- self.queue.task_done()
- else:
- batches.append(job)
- with self.lock:
- self.nb_done += 1
- if self.use_tqdm:
- self.tqdm.update(1)
- except Empty:
- with self.lock:
- if self.nb_done + self.nb_error == self.nb_total and self.running != False:
- self.running = False
- event.set()
- self.tqdm.close()
- if self.end_callback:
- self.end_callback(self.end_args)
- except Exception as error:
- with self.lock:
- # print(traceback.format_exc())
- # print(error)
- self.nb_error += 1
- if self.use_tqdm:
- self.tqdm.update(1)
- time.sleep(0.1)
- def signal_handler(self, sig, frame):
- self.event.set()
- for thread in self.threads:
- thread.join()
- exit()
- def progress_handler(self, event):
- while not event.is_set() and self.running:
- now = time.time()
- if now - self.last_time >= 1:
- self.rate_ring.append(self.nb_done - self.old_nb_done)
- self.last_time = now
- self.old_nb_done = self.nb_done
- self.rate = f'{sum(self.rate_ring) / len(self.rate_ring):.1f}'
- if self.running and self.show_progress and self.nb_done > 0 and not self.use_tqdm:
- self.draw_progress()
- def start(self, name, callback, args, total=0, end_callback=None, end_args={}):
- self.task_name = name
- self.per_sec = 0
- self.nb_done = 0
- self.nb_error = 0
- self.nb_total = total
- self.end_callback = end_callback
- self.end_args = end_args
- signal.signal(signal.SIGINT, self.signal_handler)
- signal.signal(signal.SIGTERM, self.signal_handler)
- self.start_time = time.time()
- self.running = True
- progress_thread = Thread(target=self.progress_handler, args=(self.event,))
- progress_thread.daemon = True
- self.threads.append(progress_thread)
- progress_thread.start()
- if self.use_tqdm:
- self.tqdm = tqdm(desc=self.task_name, total=total, bar_format=self.bar_format)
- for i in range(self.nb_threads):
- thread = Thread(target=self.task_handler, args=(self.event, callback, args))
- thread.daemon = True
- self.threads.append(thread)
- thread.start()
- progress_thread.join()
- self.stop()
- def stop(self):
- self.event.set()
- for thread in self.threads:
- thread.join()
- self.running = False
- def add_jobs(self, jobs):
- try:
- if not isinstance(jobs, list):
- jobs = [jobs]
- for job in jobs:
- self.queue.put_nowait(job)
- except Full:
- print('The queue is full, can\'t add a new job')
- except Exception as e:
- print(e)
- def calculate_eta(self, position, total):
- if position == 0:
- return [0, 0]
- now = time.time()
- elapsed = now - self.start_time
- if now - self.eta_last_time >= 1:
- self.eta_ring.append(elapsed * total / position)
- self.eta_last_time = now
- remaining = round(sum(self.eta_ring) / len(self.eta_ring))
- return [elapsed, remaining]
- def format_eta(self, seconds):
- return "{:0>8}".format(str(timedelta(seconds=seconds)))
- def draw_progress(self):
- total = 1 if self.nb_total == 0 else self.nb_total
- percent_done = (self.nb_done / total) * 100
- percent_error = (self.nb_error / total) * 100
- percent_total = percent_done + percent_error
- nb_done_and_error = self.nb_done + self.nb_error
- ratio = (percent_total / 100) * self.progress_size
- width = math.floor(ratio)
- empty = self.progress_size - width - 1
- slice_char = "" if empty < 0 else self.chars[math.floor((ratio % 1) * len(self.chars))]
- bar = '[{0}{1}]'.format('█' * width + slice_char, ' ' * empty)
- name = self.task_name
- bar = '[{0}{1}]'.format('█' * width + slice_char, ' ' * empty)
- done = f'Done\t{self.nb_done} ({percent_done:.2f} %)'
- error = f'Error\t{self.nb_error} ({percent_error:.2f} %)'
- total_display = f'Total\t{nb_done_and_error} / {total} ({percent_total:.2f} %)'
- rate = f'Rate\t{self.rate} it/s'
- elapsed, remaining = self.calculate_eta(nb_done_and_error, total)
- eta = f'Elapsed\t{self.format_eta(round(elapsed))}\nETA\t{self.format_eta(round(remaining - elapsed))}'
- if self.nb_done > 0:
- for _ in range(0, 7):
- sys.stdout.write("\x1b[1A\x1b[2K")
- print(f'{name}\n{bar}\n{done}\n{error}\n{total_display}\n{eta}\n{rate}', end='\r')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement