Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #twopassqcomp.py version
- twppassqcompver = '0.0.1'
- import sys, os, subprocess, shlex, argparse, time
- def log(line,logfile='twopassqcomp.log'):
- with open(logfile,'a') as l:
- l.write(line+'\n')
- l.closed
- print line
- def setrange(txt,step=0,count=0,seek=0):
- with open('test.avs', 'w') as out:
- for line in txt:
- out.write(line)
- if not (step == 0 or count == 0):
- out.write('\nSelectRangeEvery('+str(step)+','+str(count)+','+str(seek)+')')
- log('\nSetting SelectRangeEvery('+str(step)+','+str(count)+','+str(seek)+')\n')
- out.closed
- def readavs(source):
- """Read the .avs file into memory so we can use it to create our own test.avs"""
- res = []
- with open(source,'r') as avs:
- while (True):
- line = avs.readline()
- if line == '':
- break
- elif (line[:16].lower() == 'selectrangeevery'):
- log('\nYOUR .AVS FILE CONTAINS A SELECTRANGEEVERY() LINE. REMOVE IT AND TRY AGAIN.')
- exit()
- else:
- res.append(line)
- avs.closed
- return res
- def floatrange(i,j,k):
- x = range(int(100*i),int(100*j),int(100*k))
- y = list([z/100.0 for z in x])
- return y
- class TwoPassTests:
- def __init__(self, keep=False, parameters={}):
- self.keeptests = keep
- self.results = {}
- self.params = parameters
- with open('twopassqcomp.log','w') as logfile:
- logfile.write('2passqcomp version '+twppassqcompver+'\n')
- logfile.closed
- def set(self, p, v=''):
- self.params[p] = str(v)
- def rem(self, p):
- if p in self.params:
- del self.params[p]
- def setrange(txt,step=0,count=0,seek=0):
- with open('test.avs', 'w') as out:
- for line in txt:
- out.write(line)
- if not (step == 0 or count == 0):
- out.write('\nSelectRangeEvery('+str(step)+','+str(count)+','+str(seek)+')')
- log('\nSetting SelectRangeEvery('+str(step)+','+str(count)+','+str(seek)+')\n')
- out.closed
- def string(self):
- param_string = ''
- for k,v in self.params.iteritems():
- param_string += ' --' + k
- if v != '': param_string += ' ' + v
- return param_string
- def override(self,setting,tests,msg=''):
- if setting in tests:
- print 'Input a value to use for '+setting+', or press ENTER to use the computed value.'
- override = raw_input()
- print
- if override != '':
- self.set(setting,override)
- log('User chose '+setting+' '+override)
- else:
- override = ''
- while override == '':
- print 'Input a value to use for '+setting+'. '+msg
- override = raw_input()
- self.set(setting,override)
- log('User chose '+setting+' '+override)
- def test_encode(self,outfile='test.mkv',use_previous=True,verbose=True):
- self.rem('output')
- if self.string() in self.results and use_previous:
- info = self.results[self.string()]
- log('Using previously-recorded data from the following parameters:\n' + self.string())
- log('\nPreviously-recorded results:\nframes:\t\t' + str(info['frames']) +
- '\nssim:\t\t' + str(info['ssim']) + '\ndb:\t\t' + str(info['db']) + '\nbitrate:\t' +
- str(info['bitrate']) + '\nfps:\t\t' + str(info['fps']) + '\n')
- return info
- if self.keeptests or outfile.startswith('psy-rd'):
- self.set('output',outfile)
- else:
- self.set('output','test.mkv')
- info = {}
- if verbose: log('Starting test encode with the following parameters:\n' + self.string())
- with open('workfile.txt', 'w') as workfile:
- param_string = self.string()
- param_string = 'pipebuf avs2yuv test.avs -o - : x264 - --stdin y4m' + param_string
- args = shlex.split(param_string)
- ###Find the current time (which is a funky long number that docs.python.org chapter 15 describes as "Return the time as a floating point number expressed in seconds since the epoch, in UTC." We will then subtract the end time to find duration of the test.###
- start = time.time()
- p = subprocess.call(args,bufsize=-1,stdout=workfile,stderr=subprocess.STDOUT)
- ###Find the current time after the test is over to go with the time we found just before the test started so that we can compute how long this test took to run.###
- end = time.time()
- ###Subtract the end time from the start time (note that they are both measured in seconds) to find how long this test took to run and save it as info['testtime'].
- info['testtime'] = (end - start)
- workfile.closed
- with open('workfile.txt', 'r') as workfile:
- while (True):
- line = workfile.readline()
- if line == '':
- break
- elif 'consecutive B-frames' in line:
- bframe_list = [float(s[:-1]) for s in shlex.split(line)[4:]]
- info['bframes'] = [0,bframe_list]
- for i in range(len(bframe_list)):
- if bframe_list[i] >= 1.0: info['bframes'][0] = i
- elif 'SSIM Mean' in line:
- metric_list = shlex.split(line)
- info['ssim'] = float(metric_list[4][2:])
- info['db'] = float(metric_list[5][1:-3])
- elif line.startswith('encoded'):
- rate_list = shlex.split(line)
- info['frames'] = int(rate_list[1])
- info['fps'] = float(rate_list[3])
- info['bitrate'] = float(rate_list[5])
- workfile.closed
- if info == {}:
- log('\nTEST ENCODE FAILED.')
- exit()
- log('\nFinished test encode with the following results:\nframes:\t\t' + str(info['frames']) + '\nssim:\t\t' + str(info['ssim']) +
- '\ndb:\t\t' + str(info['db']) + '\nbitrate:\t' + str(info['bitrate']) +
- '\nfps:\t\t' + str(info['fps']) + '\ntest time:\t' + str(info['testtime']) + '\nfile size:\t' + str(info['filesize']) + '\n')
- self.rem('output')
- self.results[self.string()] = info
- return info
- def twopass(x264,q):
- x264.set('qcomp',q)
- log('\nRunning twopass tests with qcomp = '+str(q)+'...\n')
- tests = []
- #test bitrate in the range of start to end, in increments of step.
- ##############################
- ## BITRATE TEST VALUES ##
- ##############################
- start = 2541
- end = 3141
- step = -150
- ##############################
- for br in floatrange(start, end+step, step):
- x264.set('bitrate',br)
- for p in ['1','2']:
- self.set('pass',p)
- log('bitrate (in Kbps) = ' + str(br) + ', pass = ' + p)
- test = x264.test_encode('qcomp_'+str(q)+'_bitrate_'+str(br)+'.mkv')
- with open('twopassqcomp.txt','a') as csv:
- csv.write(str(q)+','+str(br)+','+str(p)+','+str(test['ssim'])+','+str(test['db'])+','+str(test['bitrate'])+'\n')
- csv.closed
- if __name__ == '__main__':
- ################## CLI OPTIONS ##################
- supported_tests = set(['twopass'])
- parser = argparse.ArgumentParser(description='me.py - Automated testing of various me encoding options', usage='%(prog)s infile.avs [options]')
- parser.add_argument('--version', action='version', version='%(prog)s version '+twppassqcompver)
- parser.add_argument('infile', help='source avisynth script')
- parser.add_argument('--hd', action='store_true', help='set this flag if the source is high definition')
- parser.add_argument('--sar', default=argparse.SUPPRESS, help='sample aspect ratio - mandatory for SD sources')
- parser.add_argument('--keep', action='store_true', help = 'set this flag if you want to keep all test encodes for manual \
- comparison (this may take up lots of space)')
- parser.add_argument('--only', metavar='test', dest='test', nargs='+', help='this option overrides the standard procedure \
- and only runs the specified tests')
- cli = vars(parser.parse_args())
- ################## CLI CHECKS ##################
- if not os.path.isfile(cli['infile']):
- parser.exit(status=2, message='\nError: Input file ' + cli['infile'] + ' does not exist.')
- elif not cli['infile'].endswith('.avs'):
- parser.exit(status=2, message='\nError: Input file must be an avisynth script (*.avs).')
- else:
- source = cli['infile']
- if (not cli['hd']) and ('sar' not in cli):
- parser.exit(status=2, message='\nError: SAR required for SD sources.')
- if cli['test'] is None:
- cli['test'] = set(['twopass'])
- else:
- for x in cli['test']:
- if x not in supported_tests: parser.exit(status=2, message='\nError: unsupported test '+x+'.')
- cli['test'] = set(cli['test'])
- # read the avs file, then add quotes around it so we can use it in the command line
- avstext = readavs(source)
- source = '"' + source + '"'
- ################# FIND WIDTH, HEIGHT, FRAME COUNT #################
- with open('workfile.txt', 'w') as workfile:
- args = 'pipebuf avs2yuv ' + source + ' -frames 1 -o - : x264 - --stdin y4m --output test.mkv'
- args = shlex.split(args)
- p = subprocess.call(args,bufsize=-1,stdout=workfile,stderr=subprocess.STDOUT)
- workfile.closed
- with open('workfile.txt', 'r') as workfile:
- while (True):
- line = workfile.readline()
- if line == '':
- break
- elif line.endswith('frames\n'):
- frames = int(line.split()[-2])
- res = line.split()[-5][:-1]
- res = res.split('x')
- width = int(res[0])
- height = int(res[1])
- workfile.closed
- ################# INITIAL X264 PARAMETERS #################
- ref = min(16,(8388608/(width*height)))
- x264 = TwoPassTests(keep=cli['keep'])
- if 'sar' in cli:
- x264.set('sar',cli['sar'])
- x264.set('ref',ref)
- x264.set('ssim')
- x264.set('direct','auto')
- x264.set('bframes',7)
- x264.set('b-adapt',2)
- x264.set('b-pyramid','normal')
- x264.set('psy-rd','1.0:0.0')
- x264.set('deblock','-3:-3')
- x264.set('subme',10)
- x264.set('trellis',2)
- x264.set('merange',24)
- x264.set('aq-mode',2)
- x264.set('rc-lookahead',250)
- x264.set('analyse','all')
- x264.set('no-fast-pskip')
- x264.set('thread-input')
- x264.set('passlogfile')
- x264.set('2passtry.log')
- from config import *
- config = startingOptions(x264)
- trange = config[1]
- ############### TEST ENCODING PROCEDURE #################
- prologue = '\nSTARTING twopass/QCOMP TESTING PROCESS FOR:\n' + source + '\n\nCommand line options:\n'+str(sys.argv[1:])+'\n\nParsed options:\n'
- for k,v in cli.iteritems():
- prologue += k + ': ' + str(v) + '\n'
- prologue += '\nSource width: '+str(width)+'\nSource height: '+str(height)+'\nFrame count: '+str(frames)+'\n\nx264 options:\n'+x264.string()+'\n'
- log(prologue)
- ###If the 'twopassqcomp' test is set to run then make a .csv file called crfme.csv and write me.py version (with the version number) and infile (with the .avs file path) on the first line, labels on the 2nd line, and data on the lines after that (one line per test)###
- if ('twopassqcomp' in cli['test']):
- setrange(avstext,trange[0],trange[1],trange[2])
- with open('twopassqcomp.csv','w') as csv:
- csv.write('twopassqcomp.py version'+','+twppassqcompver+','+'infile'+','+source+'\nme,crf,pass,SSIM,db,bitrate,fps,test time(in seconds),file size(in bytes)\n')
- csv.closed
- ##################
- start = 0.7
- end = 0.6
- step = -0.05
- ##################
- for q in floatrange(start,end+step,step):
- x264.set('qcomp',q)
- twopass(x264,q)
- ### DONE ###
- x264.rem('ssim')
- x264.rem('output')
- log('\n\nTESTING COMPLETE!')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement