Advertisement
Guest User

Untitled

a guest
Oct 23rd, 2019
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.25 KB | None | 0 0
  1. from functions import *
  2.  
  3. import threading
  4. from queue import Queue
  5. import getpass
  6. import argparse
  7. import logging
  8. import sys
  9. from tqdm import tqdm
  10. import time
  11.  
  12. # argument parsing
  13. parser = argparse.ArgumentParser(description="Add devices to Backbox from a CSV File")
  14. parser.add_argument("-a", "--address", metavar="Backbox_IP", required=True, help="Backbox IP-Address")
  15. parser.add_argument("-u", "--user", metavar="User_Name", required=True, help="Backbox user name")
  16. parser.add_argument("-f", "--file", metavar="CSV_File", required=True, help="csv file to devices load from")
  17. parser.add_argument("-d", "--debug_level", type=int, choices=[10, 20, 30, 40, 50], metavar="debug_level", help=argparse.SUPPRESS, default=20) # argparse with specific choices
  18. parser.add_argument("-w", "--workers", type=int, metavar="workers", help=argparse.SUPPRESS, default=5) # supress message from the help menu
  19. parser.add_argument("-p", "--progress_bar", type=bool, choices=[True, False], metavar="progress_bar", help="Add progress bar (will show printouts only if an error/warning occurred)", default=False)
  20. parser.add_argument("-e", "--show_example", metavar="Show example of CSV File", help="Show CSV file example", type=show_example, action="store") # run function in case that the variable was choosen
  21. args = parser.parse_args()
  22. file = args.file
  23. user = args.user
  24. address = args.address
  25. debug_level = args.debug_level
  26. workers = args.workers
  27. progress_bar = args.progress_bar
  28. password = getpass.getpass("Please enter password: ")
  29.  
  30. if progress_bar is True: # start the progress bar only if the flag provided
  31. progress = tqdm()
  32. debug_level = 30 # in case that the progress bar selected, show only errors in order not to interupt the progress bar
  33.  
  34. # debug configuration
  35. logging.basicConfig(stream=sys.stdout, level=debug_level, format='%(asctime)s - MODULE: %(name)s, THREAD: "%(threadName)s" - %(levelname)s - %(message)s') # log to stdout
  36. logger = logging.getLogger()
  37.  
  38. devices = csv_to_json(file) # convert csv to the json object
  39. cookies = login_to_back_box(address=address, user=user, password=password) # get the login cookies
  40.  
  41. start = time.time()
  42. total_jobs_run = 0 # the total jobs that we run already
  43. total_devices = len(devices) # the total jobs that we need to run
  44. print_lock = threading.Lock()
  45.  
  46. # worker that contasin the function that we want to run in a tread
  47. def worker_thread(device_, cookies_, address_, q_):
  48. global total_jobs_run # use the veriable that contains that total jobs that we run in the thread in order to know when to stop adding jobs to the queue
  49. global total_devices
  50.  
  51. job = q_.get() # get a job from the queue
  52. logger.debug(f"Number of jobs in the queue before running task: {q.qsize()}")
  53. create_device(device_, cookies_, address_)
  54. total_jobs_run += 1 # increment the number of jobs that we already run
  55.  
  56. if progress_bar is True: # use the progress bar only if it was defined to run with argparse
  57. global progress
  58. progress.total = total_devices * 50
  59. with print_lock:
  60. progress.update(total_jobs_run)
  61. progress.refresh()
  62.  
  63. logger.debug(f"TOTAL RUN: {total_jobs_run}")
  64. logger.debug(f"CURRENT RUN: {total_devices}")
  65. q_.put(1) # put a job in the queue in order to other thread to start
  66. q_.task_done() # notify that the job was done in order to the queue.join to know that all jobs were done.
  67.  
  68. if q_.qsize() == workers and workers == 1: # if the worker == 1 and the q.size() == 1 then there is a need to do task_done only once.
  69. logger.debug(f"TOTAL DEVICES: {total_devices}, TOTOAL JOB RUN: {total_jobs_run}, WORKERS: {workers}, QSIZE: {q_.qsize()}")
  70. if total_devices == total_jobs_run:
  71. if q_.qsize() == 1:
  72. try: # in case that somehow the queue is already empty do not show an exception
  73. q_.task_done()
  74. except:
  75. return
  76.  
  77. logger.debug(f"QSIZE WHEN DONE: {q_.qsize()}")
  78.  
  79. elif workers == q_.qsize(): # if the queue size == workers all the jobs were done and the exact number of workers equale to the number of the jobs since in every job completion we are adding more job to the queue
  80. while not q_.empty():
  81. try:
  82. q_.task_done()
  83. except:
  84. return
  85.  
  86.  
  87. q = Queue()
  88.  
  89. # add the devices to backbox
  90. for device in devices:
  91. t = threading.Thread(target=worker_thread, args=(device, cookies, address, q))
  92. t.name = device['deviceName']
  93. t.daemon = False # dont close the application when the main thread stops, wait for all the threads to complete before closing the main thread. if not configured, the main thread will finish but the threads will continue working
  94. t.start()
  95.  
  96. # put few workers to work by adding jobs in order to start execution, the number of workers will also be the number of starting threads
  97. # every worker in this app will add additional job in completion
  98. for job in range(workers):
  99. q.put(job)
  100.  
  101. q.join() # its a blocking function, wait for all the jobs to compelte, will wait until there is no jobs in the queue (will not affect in case that the threads will be configured as deamon). running task done on all jobs will cause this blocking function to move forward (when q.size() is empty) .
  102. logger.info('Job took: ' + str(time.time()-start) + "seconds")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement