Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import os
- import logging as log
- import random
- import fcntl
- import subprocess
- import time
- def process_tasklist(tasks, max_parallel, completion_callback):
- """
- Runs a list of tasks by spawning a subprocess for them using the Popen
- API. The passed tasks argument should be a list of dictionaries that have
- at least the following keys:
- "cmd" - which in turn is a list of strings, passed direcly
- to Popen(), such as: ["ls", "-ls"].
- "name" - the name of the task
- The campletion_callback should be a function that accepts 2 arguments;
- the first one will be the completed task dictionary from tasks queue and
- the second one the process return code.
- Args:
- - tasks (list): list of task dictionaries, see above
- - max_parallel (int): maximum number of parallel operations running
- concurrently
- - completion_callback (callable): called when a task completes;
- first argument will be the original task dictionary, the second arg will
- be the (int) return code of the process.
- """
- # Tasks running currently. Contains task descriptors that are objects
- # in this fashion: {"p": Popen object, "task": original task object}
- running_tasks = []
- def poll_task(desc):
- # Polls for task completion; returns True if the task has completed
- p = desc["p"]
- task = desc["task"]
- # Read and log everything available in stdout; readline returns
- # empty string when there is no more input to be read.
- while True:
- try:
- line = p.stdout.readline()
- except IOError, e:
- # No data available in stdout
- break
- if line != "":
- log.debug("STDOUT: %s" % line.strip())
- else:
- # Process has finished
- break
- r = p.poll()
- if r is None:
- log.debug("Task %s hasn't completed yet." % task["name"])
- return False
- else:
- # Read process stdout + stderr
- out, err = p.communicate()
- if r == 0:
- log.debug("Task %s has completed successfully." % task["name"])
- else:
- log.error("Task %s has failed with code %s" %
- (task["name"], r))
- log.error("** STDERR:\n%s" % err.strip())
- completion_callback(task, r)
- return True
- # Loop while there are tasks to be started
- while len(tasks) > 0:
- log.debug("There are %s tasks running." % len(running_tasks))
- # Wait until there is 'room' to run a task
- while len(running_tasks) >= max_parallel:
- log.debug("Polling task completions..")
- for i in xrange(len(running_tasks)):
- desc = running_tasks[i]
- if poll_task(desc):
- running_tasks.pop(i)
- break
- # If we're still 'full', sleep a bit and go for more
- if len(running_tasks) >= max_parallel:
- log.debug("Sleeping a second before polling again..")
- time.sleep(1)
- log.debug("Starting the next task from the queue")
- task = tasks.pop(0)
- # Start the task process
- cmd = task["cmd"]
- log.debug("Running task with command: %s" % cmd)
- p = subprocess.Popen(cmd, bufsize=1, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- # Configure the p.stdout to be non-blocking
- fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
- fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
- # Create a task descriptor and add it to running tasks
- running_tasks.append({"p": p, "task": task})
- log.debug("All tasks deployed, now waiting for them to finish..")
- # Wait for all tasks to finish
- while len(running_tasks) > 0:
- for i in xrange(len(running_tasks)):
- desc = running_tasks[i]
- if poll_task(desc):
- running_tasks.pop(i)
- break
- # If there are incomplete tasks, sleep a while before polling again
- if len(running_tasks) > 0:
- log.debug("Sleeping a second while waiting for tasks to finish..")
- time.sleep(1)
- log.debug("All tasks have completed.")
- def main(argv):
- log.basicConfig(level=log.DEBUG)
- # Number of test tasks to launch
- NUM_TASKS = 3
- # Maximum execution time for test tasks (in seconds)
- MAX_EXEC_TIME = 5
- log.debug("Adding %s tasks." % NUM_TASKS)
- # Create our tasks by allocating a random execution time
- script = os.path.join(os.getcwd(), "task.sh")
- tasks = []
- for i in xrange(NUM_TASKS):
- exectime = random.randint(1, MAX_EXEC_TIME)
- willfail = random.randint(0, 1)
- taskname = "Task%s" % i
- task = {
- "name": taskname,
- "cmd": ["sh", script, taskname, str(exectime), str(willfail)]
- }
- log.debug("Created task: %s" % task)
- tasks.append(task)
- def task_callback(task, return_code):
- log.debug("Task completed: %s with code: %s" % (task, return_code))
- process_tasklist(tasks, 2, task_callback)
- if __name__ == '__main__':
- main(sys.argv)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement