Advertisement
vidzor

general_utils.py

Dec 9th, 2021
982
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.88 KB | None | 0 0
  1.  
  2. import os
  3. import re
  4. from pathlib import Path
  5. import concurrent.futures
  6. from threading import Lock
  7. import time
  8. import cv2
  9. import pickle
  10.  
  11. # multiprocessing.set_start_method('spawn', True)
  12. os.environ['OPENBLAS_NUM_THREADS'] = '1' # 'None'
  13. os.environ['MKL_NUM_THREADS'] = '1' # 'None'
  14. os.environ['NUMEXPR_NUM_THREADS'] = '1' # 'None'
  15. os.environ['OMP_NUM_THREADS'] = '1' # 'None'
  16. import numpy as np
  17. cv2.setNumThreads(1)
  18.  
  19. def printProgressBar( iteration, total, prefix='', suffix='', decimals=1, length=100, fill='=' ):
  20.     """
  21.     Call in a loop to create terminal progress bar
  22.     @params:
  23.         iteration   - Required  : current iteration (Int)
  24.         total       - Required  : total iterations (Int)
  25.         prefix      - Optional  : prefix string (Str)
  26.         suffix      - Optional  : suffix string (Str)
  27.         decimals    - Optional  : positive number of decimals in percent complete (Int)
  28.         length      - Optional  : character length of bar (Int)
  29.         fill        - Optional  : bar fill character (Str)
  30.     Example usage:
  31.         for i, img in enumerate(img_files):
  32.             printProgressBar(
  33.                 i+1, len(img_files), prefix = f'Loading Data: {i+1}/{len(img_files)}', length = 50
  34.             )
  35.     """
  36.     percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
  37.     filledLength = int(length * iteration // total)
  38.     bar = fill * filledLength + '.' * (length - filledLength)
  39.     os.sys.stdout.write('\r %s |%s| %s%% %s' % (prefix, bar, percent, suffix))
  40.     os.sys.stdout.flush()
  41.     # if iteration == total: os.sys.stdout.write('\n')
  42.  
  43. def load_file_paths( root_path, pattern_matcher, recursive=False ):
  44.     """
  45.     loads file paths, uses REGEX for filename/extension pattern matching
  46.     :param root_path: parent directory to load from
  47.     :param pattern_matcher: a compiled REGEX object for filtering filenames
  48.         example1: pattern_matcher = re.compile( r'[.](jpe?g|png)$', re.IGNORECASE ) -> loads all JPGs and PNGs
  49.         example2: pattern_matcher = re.compile( r'.' ) -> loads everything
  50.     :param recursive: True = will look for matches recursively, False = only in root_path
  51.     :return: list of loaded paths
  52.     """
  53.     if not isinstance( pattern_matcher, re.compile('').__class__ ):
  54.         print(' Invalid pattern_matcher: need to pass in re.compile(...) REGEX object. ')
  55.         return []
  56.     root_path = Path(root_path)
  57.     all_paths = []
  58.    
  59.     def load( root_path, pattern_matcher, recursive ):
  60.        
  61.         for file_path in os.listdir( str(root_path) ):
  62.             full_file_path = Path( root_path, file_path )
  63.            
  64.             if pattern_matcher.search( str(file_path) ):
  65.                 all_paths.append( full_file_path )
  66.            
  67.             if full_file_path.is_dir() and recursive:
  68.                 load( full_file_path, pattern_matcher, recursive )
  69.  
  70.     load( root_path, pattern_matcher, recursive )
  71.    
  72.     return all_paths
  73.  
  74.  
  75. class CacheUtility:
  76.     """
  77.     A class for caching python objects with pickle
  78.     """
  79.     @staticmethod
  80.     def load( cache_file_path, verbose=True ):
  81.         if cache_file_path.exists():
  82.             with open(str(cache_file_path), 'rb') as cache_fid:
  83.                 data = pickle.load( cache_fid )
  84.             if verbose: print(f" Read from cache: {cache_file_path} ")
  85.             return data
  86.  
  87.     @staticmethod
  88.     def save( cache_file_path, data, verbose=True ):
  89.         if not cache_file_path.exists():
  90.             with open(str(cache_file_path), 'wb') as cache_fid:
  91.                 pickle.dump( data, cache_fid )
  92.             if verbose: print(f" Cached: {cache_file_path} ")
  93.  
  94.  
  95. class ConcurrentUtility(object):
  96.  
  97.     def __init__(self,
  98.                  num_workers, input_array_size, output_array=None, progressbar_message=None, **kwargs
  99.                  ):
  100.         self.num_workers = num_workers
  101.         self.num_subsets = self.num_workers
  102.  
  103.         self.input_array_size = input_array_size
  104.         self.output_array = np.array(output_array) if output_array is not None else None
  105.  
  106.         self.kwargs = kwargs
  107.  
  108.         self.subsets_indices = self.create_subsets_indices(
  109.             size=self.input_array_size, num_subsets=self.num_subsets
  110.         )
  111.         self.subset_function = None
  112.  
  113.         self.lock = Lock()
  114.         self.progress = [0, sum(len(si) for si in self.subsets_indices)]
  115.         self.progress_tracking_message = 'Processing:' if progressbar_message is None else progressbar_message
  116.  
  117.         self.time_delta = 0.0
  118.  
  119.     def run(self, verbose=True):
  120.         time_start = time.time()
  121.  
  122.         futures = [None for _ in range(self.num_subsets)]
  123.         num_workers = self.num_workers
  124.         if verbose: num_workers += 1
  125.  
  126.         with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
  127.             # submit progress tracking daemon (progressbar):
  128.             if verbose: executor.submit(self.__progress_tracking_daemon, self)
  129.             # submit subset workers:
  130.             for ss in range(self.num_subsets):
  131.                 futures[ss] = executor.submit(self.subset_function, self, ss)
  132.  
  133.         if self.output_array is not None:
  134.             for future in concurrent.futures.as_completed(futures):
  135.                 ss, output = future.result()
  136.                 self.output_array[self.subsets_indices[ss]] = output
  137.  
  138.         self.time_delta = time.time() - time_start
  139.         return self.output_array
  140.  
  141.     @staticmethod
  142.     def create_subsets_indices(size, num_subsets):
  143.         """
  144.  
  145.         :param size:
  146.         :param num_subsets:
  147.         :return:
  148.         """
  149.         if size < 1 or num_subsets <= 0: return None
  150.         subset_size = int(size / num_subsets)
  151.         if subset_size < 1: return None
  152.  
  153.         indices = []
  154.         subset_indices = []
  155.         for i in range(size):
  156.             if len(indices) < num_subsets:
  157.                 subset_indices.append(i)
  158.                 if len(subset_indices) == subset_size:
  159.                     indices.append(subset_indices)
  160.                     subset_indices = []
  161.             else:
  162.                 indices[-1].append(i)
  163.  
  164.         return indices
  165.  
  166.     def get_progress(self):
  167.         self.lock.acquire()
  168.         current_pi, end_pi = self.progress
  169.         self.lock.release()
  170.         return current_pi, end_pi
  171.  
  172.     def update_progress(self):
  173.         self.lock.acquire()
  174.         self.progress[0] += 1
  175.         self.lock.release()
  176.  
  177.     def __progress_tracking_daemon(self, cu):
  178.         while True:
  179.             current_pi, end_pi = cu.get_progress()
  180.             if current_pi >= end_pi: break
  181.             printProgressBar(
  182.                 current_pi + 1, end_pi, prefix=f'{cu.progress_tracking_message} {current_pi + 1}/{end_pi}', length=50
  183.             )
  184.         print(' ')
  185.  
  186.    
  187.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement