Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def threaded_function(params):
- """the custom function
- :param kwargs params: device details packed as keyword arguments
- :returns: dictionary containing the results of each step
- """
- logging.debug('TEST')
- logging.info('TEST2')
- logging.error('TEST3')
- def main_loop():
- logging.getLogger("netmiko").propagate = False
- logging.getLogger("netmiko").disabled = True
- logging.getLogger("paramiko").propagate = False
- logging.getLogger("paramiko").disabled = True
- log_time = datetime.now().strftime("%Y%m%d-%H%M%S")
- LOG_FILE = os.path.basename(input_file) + '.txt'
- logger = logging.getLogger()
- logger.setLevel(logging.DEBUG)
- if args.log2file:
- log_format_file = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- rfh = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes=1048576, backupCount=10)
- rfh.setLevel(args.filelevel)
- rfh.setFormatter(log_format_file)
- logger.addHandler(rfh)
- if args.log2con:
- log_format_con = logging.Formatter('%(message)s')
- ch = logging.StreamHandler()
- ch.setLevel(args.conlevel)
- ch.setFormatter(log_format_con)
- logger.addHandler(ch)
- if not args.log2file and not args.log2con:
- logger.disabled = True
- logger.propagate = False
- logging.info("JOB STARTED: " + str(log_time))
- pool = multiprocessing.Pool(max_cpu_count)
- with tqdm(total=len(input_list), ascii=True, unit="dev", desc='processing',
- disable=(False if args.log2con else True)) as t:
- for _ in pool.imap_unordered(threaded_function, input_list):
- t.update(1)
- results.append(_)
- if __name__ == "__main__":
- main_loop()
Add Comment
Please, Sign In to add comment