Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- import argparse
- import datetime
- import logging
- import math
- import os
- import os.path
- import subprocess
- import threading
- print_details = False
- def timed_call(cmds, timeout, cwd='.', stdin=None, stdout=None):
- # returns: (return code, total time in second, error message)
- scope = [None, None, None]
- def target():
- scope[0] = subprocess.Popen(cmds, cwd=cwd,
- stdin=stdin, stdout=stdout,
- stderr=subprocess.PIPE)
- (scope[1], scope[2]) = scope[0].communicate()
- thread = threading.Thread(target = target)
- t0 = os.times()
- thread.start()
- thread.join(timeout)
- t1 = os.times()
- total_time = (t1[2] - t0[2]) + (t1[3] - t0[3]) # already in second
- if thread.is_alive():
- scope[0].terminate()
- thread.join()
- t1 = os.times()
- total_time = (t1[2] - t0[2]) + (t1[3] - t0[3])
- if scope[0] is not None:
- error = scope[2].replace('\n', ' ').replace('\r', ' ')
- return (scope[0].returncode, total_time, error)
- return (None, total_time, "failed to create subprocess")
- def run(cwd, target, kind, repeat, timeout, in_file_path, out_file_path):
- # returns: (target return code, mean time in second, error message)
- total_time = 0
- for i in range(repeat):
- time = 0
- input_file = None
- output_file = None
- try:
- input_file = open(in_file_path, 'r')
- output_file = open(out_file_path, 'w')
- result = 0
- error = None
- if kind == 'exe':
- (result, time, error) =\
- timed_call([target], timeout=timeout, cwd=cwd,
- stdin=input_file, stdout=output_file)
- elif kind == 'java':
- target_name = os.path.basename(target)
- (result, time, error) =\
- timed_call(['java', '-cp', cwd, target_name],
- timeout=timeout, cwd=cwd,
- stdin=input_file, stdout=output_file)
- if result is None:
- return (result, -1, "result is None")
- elif result != 0:
- return (result, -1, error)
- elif i > 0: # skip the first execution
- total_time += time
- finally:
- if input_file is not None:
- input_file.close()
- if output_file is not None:
- output_file.close()
- mean_time = total_time / (repeat - 1)
- return (0, mean_time, None)
- def calc_path_len(points, path):
- path_len = 0
- for k in range(len(path) - 1):
- i = path[k]
- j = path[k+1]
- (x1, y1) = points[i]
- (x2, y2) = points[j]
- step = math.sqrt((x1 - x2)**2 + (y1 - y2)**2)
- path_len += step
- if print_details:
- print("dist from ({0},{1}) to ({2},{3}) is {4};\tacc dist={5}".format(
- x1, y1, x2, y2, step, path_len))
- return path_len
- def validate(the_points, out_file_path):
- # returns: (successful or not, path len or error message)
- if not os.path.exists(out_file_path) or not os.path.isfile(out_file_path):
- return (False, 'file not found: ' + out_file_path)
- the_num_points = len(the_points)
- path = []
- path_len = 0
- claimed_path_len = 0
- f = None
- try:
- f = open(out_file_path)
- lines = f.readlines()
- # remove empty lines
- lines = [line for line in lines if len(line.strip()) > 0]
- n = len(lines)
- if n > 0:
- for i in range(n-1):
- line = lines[i].strip()
- try:
- p = int(line)
- path.append(p)
- except ValueError:
- print("point index expected but got \"{0}\"".format(line))
- continue
- try:
- claimed_path_len = float(lines[-1])
- except ValueError:
- return (False, "unable to parse path len: " + lines[-1])
- else:
- return (False, "empty output")
- finally:
- if f is not None:
- f.close()
- # 1. path goes through each point once and only once and get back:
- if path[0] != path[-1]:
- return (False, "path not closed")
- points = path[:-1]
- the_point_set = set(range(1000))
- point_set = set(path[:-1])
- missing = the_point_set - point_set
- extra = point_set - the_point_set
- dup = [x for x in point_set if points.count(x) > 1]
- if len(missing) > 0:
- return (False, "missing points: {0}".format(missing))
- if len(extra) > 0:
- return (False, "extra points: {0}".format(extra))
- if len(dup) > 0:
- return (False, "duplicate points: {1}".format(dup))
- # 2. path length error is within 1%
- path_len = calc_path_len(the_points, path)
- error = math.fabs(claimed_path_len - path_len)
- error_rate = error / path_len
- if error_rate > 0.01:
- return (False,
- "claimed len: {0:.0f}, actual len: {1:.0f}"
- .format(claimed_path_len, path_len))
- # all passed!
- return (True, path_len)
- def read_points(file_path):
- points = []
- f = open(file_path)
- try:
- lines = f.readlines()
- for line in lines:
- line = line.strip()
- if line.startswith('#') or len(line) == 0:
- continue
- if line.startswith('\x04') or line.startswith('^D'):
- break
- (sx, sy) = line.split()
- (x, y) = (float(sx), float(sy))
- points.append((x, y))
- finally:
- f.close()
- return points
- def run_and_validate(working_dir, target, kind,
- in_file_path, out_file_path, repeat, timeout):
- points = read_points(in_file_path)
- return_code = 0
- time = 0
- try:
- (code, time, error) = run(working_dir, target, kind, repeat, timeout,
- in_file_path, out_file_path)
- if code == 0:
- (succ, v_res) = validate(points, out_file_path)
- if succ:
- print("success: mean time={0}, path len={1}".format(time, v_res))
- else:
- print("validation error: {0}".format(v_res))
- else:
- print("failed to run: return code={0}, error={1}"
- .format(code, error))
- except Exception as e:
- print("exception: {0}".format(str(e)))
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('--dir', '-d', dest='dir', default='.',
- help="The working directory")
- parser.add_argument('--type', '-t', dest='target_type',
- choices=['exe', 'java'], default='exe',
- help="The type of the target")
- parser.add_argument('--timeout', '-s', type=float,
- dest='timeout', default=100,
- help="Max seconds to wait before terminating the run")
- parser.add_argument('--repeat', '-n', type=int,
- dest='repeat', metavar='N', default=5,
- help="Repeat N times to calculate an average time")
- parser.add_argument('--output_file_name', '-o',
- dest='output_file_name', default="output.txt",
- help="Name of the output file")
- parser.add_argument('--details', '-D', dest='details', action='store_true',
- help="Print detailed information")
- parser.add_argument('target',
- help="Name of the executable or Java class")
- parser.add_argument('input_file_path',
- help="Path to the input file")
- args = parser.parse_args()
- print_details = args.details
- working_dir = os.path.abspath(args.dir)
- run_and_validate(working_dir,
- os.path.join(working_dir, args.target),
- args.target_type,
- os.path.abspath(args.input_file_path),
- os.path.join(working_dir, args.output_file_name),
- args.repeat,
- args.timeout)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement