Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import re
- from pathlib import Path
- import concurrent.futures
- from threading import Lock
- import time
- import cv2
- import pickle
- # multiprocessing.set_start_method('spawn', True)
- os.environ['OPENBLAS_NUM_THREADS'] = '1' # 'None'
- os.environ['MKL_NUM_THREADS'] = '1' # 'None'
- os.environ['NUMEXPR_NUM_THREADS'] = '1' # 'None'
- os.environ['OMP_NUM_THREADS'] = '1' # 'None'
- import numpy as np
- cv2.setNumThreads(1)
- def printProgressBar( iteration, total, prefix='', suffix='', decimals=1, length=100, fill='=' ):
- """
- Call in a loop to create terminal progress bar
- @params:
- iteration - Required : current iteration (Int)
- total - Required : total iterations (Int)
- prefix - Optional : prefix string (Str)
- suffix - Optional : suffix string (Str)
- decimals - Optional : positive number of decimals in percent complete (Int)
- length - Optional : character length of bar (Int)
- fill - Optional : bar fill character (Str)
- Example usage:
- for i, img in enumerate(img_files):
- printProgressBar(
- i+1, len(img_files), prefix = f'Loading Data: {i+1}/{len(img_files)}', length = 50
- )
- """
- percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
- filledLength = int(length * iteration // total)
- bar = fill * filledLength + '.' * (length - filledLength)
- os.sys.stdout.write('\r %s |%s| %s%% %s' % (prefix, bar, percent, suffix))
- os.sys.stdout.flush()
- # if iteration == total: os.sys.stdout.write('\n')
- def load_file_paths( root_path, pattern_matcher, recursive=False ):
- """
- loads file paths, uses REGEX for filename/extension pattern matching
- :param root_path: parent directory to load from
- :param pattern_matcher: a compiled REGEX object for filtering filenames
- example1: pattern_matcher = re.compile( r'[.](jpe?g|png)$', re.IGNORECASE ) -> loads all JPGs and PNGs
- example2: pattern_matcher = re.compile( r'.' ) -> loads everything
- :param recursive: True = will look for matches recursively, False = only in root_path
- :return: list of loaded paths
- """
- if not isinstance( pattern_matcher, re.compile('').__class__ ):
- print(' Invalid pattern_matcher: need to pass in re.compile(...) REGEX object. ')
- return []
- root_path = Path(root_path)
- all_paths = []
- def load( root_path, pattern_matcher, recursive ):
- for file_path in os.listdir( str(root_path) ):
- full_file_path = Path( root_path, file_path )
- if pattern_matcher.search( str(file_path) ):
- all_paths.append( full_file_path )
- if full_file_path.is_dir() and recursive:
- load( full_file_path, pattern_matcher, recursive )
- load( root_path, pattern_matcher, recursive )
- return all_paths
- class CacheUtility:
- """
- A class for caching python objects with pickle
- """
- @staticmethod
- def load( cache_file_path, verbose=True ):
- if cache_file_path.exists():
- with open(str(cache_file_path), 'rb') as cache_fid:
- data = pickle.load( cache_fid )
- if verbose: print(f" Read from cache: {cache_file_path} ")
- return data
- @staticmethod
- def save( cache_file_path, data, verbose=True ):
- if not cache_file_path.exists():
- with open(str(cache_file_path), 'wb') as cache_fid:
- pickle.dump( data, cache_fid )
- if verbose: print(f" Cached: {cache_file_path} ")
- class ConcurrentUtility(object):
- def __init__(self,
- num_workers, input_array_size, output_array=None, progressbar_message=None, **kwargs
- ):
- self.num_workers = num_workers
- self.num_subsets = self.num_workers
- self.input_array_size = input_array_size
- self.output_array = np.array(output_array) if output_array is not None else None
- self.kwargs = kwargs
- self.subsets_indices = self.create_subsets_indices(
- size=self.input_array_size, num_subsets=self.num_subsets
- )
- self.subset_function = None
- self.lock = Lock()
- self.progress = [0, sum(len(si) for si in self.subsets_indices)]
- self.progress_tracking_message = 'Processing:' if progressbar_message is None else progressbar_message
- self.time_delta = 0.0
- def run(self, verbose=True):
- time_start = time.time()
- futures = [None for _ in range(self.num_subsets)]
- num_workers = self.num_workers
- if verbose: num_workers += 1
- with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
- # submit progress tracking daemon (progressbar):
- if verbose: executor.submit(self.__progress_tracking_daemon, self)
- # submit subset workers:
- for ss in range(self.num_subsets):
- futures[ss] = executor.submit(self.subset_function, self, ss)
- if self.output_array is not None:
- for future in concurrent.futures.as_completed(futures):
- ss, output = future.result()
- self.output_array[self.subsets_indices[ss]] = output
- self.time_delta = time.time() - time_start
- return self.output_array
- @staticmethod
- def create_subsets_indices(size, num_subsets):
- """
- :param size:
- :param num_subsets:
- :return:
- """
- if size < 1 or num_subsets <= 0: return None
- subset_size = int(size / num_subsets)
- if subset_size < 1: return None
- indices = []
- subset_indices = []
- for i in range(size):
- if len(indices) < num_subsets:
- subset_indices.append(i)
- if len(subset_indices) == subset_size:
- indices.append(subset_indices)
- subset_indices = []
- else:
- indices[-1].append(i)
- return indices
- def get_progress(self):
- self.lock.acquire()
- current_pi, end_pi = self.progress
- self.lock.release()
- return current_pi, end_pi
- def update_progress(self):
- self.lock.acquire()
- self.progress[0] += 1
- self.lock.release()
- def __progress_tracking_daemon(self, cu):
- while True:
- current_pi, end_pi = cu.get_progress()
- if current_pi >= end_pi: break
- printProgressBar(
- current_pi + 1, end_pi, prefix=f'{cu.progress_tracking_message} {current_pi + 1}/{end_pi}', length=50
- )
- print(' ')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement