Advertisement
Kala666

popen_parallel.py

Apr 19th, 2019
145
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 5.46 KB | None | 0 0
  1. import sys
  2. import os
  3. import logging as log
  4. import random
  5. import fcntl
  6.  
  7. import subprocess
  8. import time
  9.  
  10. def process_tasklist(tasks, max_parallel, completion_callback):
  11.     """
  12.    Runs a list of tasks by spawning a subprocess for them using the Popen
  13.    API. The passed tasks argument should be a list of dictionaries that have
  14.    at least the following keys:
  15.  
  16.    "cmd" - which in turn is a list of strings, passed direcly
  17.    to Popen(), such as: ["ls", "-ls"].
  18.    "name" - the name of the task
  19.  
  20.    The campletion_callback should be a function that accepts 2 arguments;
  21.    the first one will be the completed task dictionary from tasks queue and
  22.    the second one the process return code.
  23.  
  24.    Args:
  25.      - tasks (list): list of task dictionaries, see above
  26.      - max_parallel (int): maximum number of parallel operations running
  27.        concurrently
  28.      - completion_callback (callable): called when a task completes;
  29.        first argument will be the original task dictionary, the second arg will
  30.        be the (int) return code of the process.
  31.    """
  32.  
  33.     # Tasks running currently. Contains task descriptors that are objects
  34.     # in this fashion: {"p": Popen object, "task": original task object}
  35.     running_tasks = []
  36.  
  37.     def poll_task(desc):
  38.         # Polls for task completion; returns True if the task has completed
  39.         p = desc["p"]
  40.         task = desc["task"]
  41.  
  42.         # Read and log everything available in stdout; readline returns
  43.         # empty string when there is no more input to be read.
  44.         while True:
  45.             try:
  46.                 line = p.stdout.readline()
  47.             except IOError, e:
  48.                 # No data available in stdout
  49.                 break
  50.            
  51.             if line != "":
  52.                 log.debug("STDOUT: %s" % line.strip())
  53.             else:
  54.                 # Process has finished
  55.                 break            
  56.            
  57.         r = p.poll()
  58.         if r is None:
  59.             log.debug("Task %s hasn't completed yet." % task["name"])
  60.             return False
  61.         else:
  62.             # Read process stdout + stderr
  63.             out, err = p.communicate()
  64.            
  65.             if r == 0:
  66.                 log.debug("Task %s has completed successfully." % task["name"])
  67.             else:
  68.                 log.error("Task %s has failed with code %s" %
  69.                           (task["name"], r))
  70.                 log.error("** STDERR:\n%s" % err.strip())
  71.                
  72.             completion_callback(task, r)
  73.             return True
  74.        
  75.     # Loop while there are tasks to be started
  76.     while len(tasks) > 0:
  77.         log.debug("There are %s tasks running." % len(running_tasks))
  78.        
  79.         # Wait until there is 'room' to run a task
  80.         while len(running_tasks) >= max_parallel:
  81.             log.debug("Polling task completions..")
  82.             for i in xrange(len(running_tasks)):
  83.                 desc = running_tasks[i]
  84.                 if poll_task(desc):
  85.                     running_tasks.pop(i)
  86.                     break
  87.  
  88.             # If we're still 'full', sleep a bit and go for more
  89.             if len(running_tasks) >= max_parallel:
  90.                 log.debug("Sleeping a second before polling again..")
  91.                 time.sleep(1)
  92.            
  93.         log.debug("Starting the next task from the queue")
  94.         task = tasks.pop(0)
  95.  
  96.         # Start the task process
  97.         cmd = task["cmd"]
  98.         log.debug("Running task with command: %s" % cmd)
  99.  
  100.         p = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE,
  101.                              stderr=subprocess.PIPE)
  102.  
  103.         # Configure the p.stdout to be non-blocking
  104.         fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
  105.         fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
  106.        
  107.         # Create a task descriptor and add it to running tasks
  108.         running_tasks.append({"p": p, "task": task})
  109.  
  110.     log.debug("All tasks deployed, now waiting for them to finish..")
  111.    
  112.     # Wait for all tasks to finish
  113.     while len(running_tasks) > 0:
  114.         for i in xrange(len(running_tasks)):
  115.             desc = running_tasks[i]
  116.             if poll_task(desc):
  117.                 running_tasks.pop(i)
  118.                 break
  119.            
  120.         # If there are incomplete tasks, sleep a while before polling again    
  121.         if len(running_tasks) > 0:
  122.             log.debug("Sleeping a second while waiting for tasks to finish..")
  123.             time.sleep(1)
  124.            
  125.  
  126.     log.debug("All tasks have completed.")
  127.    
  128. def main(argv):
  129.     log.basicConfig(level=log.DEBUG)
  130.  
  131.     # Number of test tasks to launch
  132.     NUM_TASKS = 3
  133.  
  134.     # Maximum execution time for test tasks (in seconds)
  135.     MAX_EXEC_TIME = 5
  136.  
  137.     log.debug("Adding %s tasks." % NUM_TASKS)
  138.    
  139.     # Create our tasks by allocating a random execution time
  140.     script = os.path.join(os.getcwd(), "task.sh")
  141.     tasks = []
  142.     for i in xrange(NUM_TASKS):
  143.         exectime = random.randint(1, MAX_EXEC_TIME)
  144.         willfail = random.randint(0, 1)
  145.         taskname = "Task%s" % i
  146.         task = {
  147.             "name": taskname,
  148.             "cmd": ["sh", script, taskname, str(exectime), str(willfail)]
  149.         }
  150.         log.debug("Created task: %s" % task)
  151.         tasks.append(task)
  152.  
  153.     def task_callback(task, return_code):
  154.         log.debug("Task completed: %s with code: %s" % (task, return_code))
  155.  
  156.     process_tasklist(tasks, 2, task_callback)
  157.  
  158. if __name__ == '__main__':
  159.     main(sys.argv)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement