Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import datetime
- import sys
- from collections import defaultdict
- import re
- import requests
- from genologics.lims import Lims
- baseuri = ''
- username = ''
- password = ''
- apiurl = ''
- l = Lims(baseuri=baseuri, username=username, password=password)
- def get_lims_samples(sample_name, lims):
- samples = lims.get_samples(name=sample_name)
- # FIXME: Remove the hack when we're sure our sample id don't have colon
- if len(samples) == 0:
- sample_name_sub = re.sub("_(\d{2})$", ":\g<1>", sample_name)
- samples = lims.get_samples(name=sample_name_sub)
- if len(samples) == 0:
- sample_name_sub = re.sub("__(\w)_(\d{2})", " _\g<1>:\g<2>", sample_name)
- samples = lims.get_samples(name=sample_name_sub)
- return samples
- def get_lims_sample(sample_name):
- samples = get_lims_samples(sample_name, l)
- if len(samples) != 1:
- return None
- return samples[0]
- def get_expected_yield_for_sample(sample_name):
- """
- Query the LIMS and return the number of bases expected for a sample
- :param sample_name: the sample name
- :return: number of bases
- """
- sample = get_lims_sample(sample_name)
- if sample:
- nb_gb = sample.udf.get('Yield for Quoted Coverage (Gb)')
- else:
- nb_gb = 95
- return int(nb_gb) * 1000000000
- def new_sample_passing_threshold(sample_ids_to_info):
- sample_passing_threshold = []
- for sample_id in sample_ids_to_info:
- threshold = get_expected_yield_for_sample(sample_id)
- if sample_ids_to_info[sample_id].get('yield_q30', 0) > threshold and \
- sample_ids_to_info[sample_id].get('ready', 'no') == 'no':
- sample_ids_to_info[sample_id]['ready'] = 'yes'
- sample_passing_threshold.append(sample_id)
- return sample_passing_threshold
- def depaginate(url, query, extra):
- r = requests.get(url+query+extra)
- json = r.json()
- samples = json.get('data')
- while 'next' in json['_links']:
- query = json['_links']['next']['href']
- r = requests.get(url+query+extra)
- json = r.json()
- samples.extend(json.get('data'))
- return samples
- INTERVAL_WEEK = 'week'
- INTERVAL_MONTH = 'month'
- def aggregate_run_per_week(run_elements):
- data_per_week = defaultdict(list)
- weeks = set()
- for run_element in run_elements:
- date = datetime.datetime.strptime(run_element.get('run_id')[:6],'%y%m%d')
- year, week_of_the_year, day_of_week = date.isocalendar()
- start = date - datetime.timedelta(days=date.weekday())
- end = start + datetime.timedelta(days=6)
- week = '%s week %02d (%s - %s)' % (year, week_of_the_year, start.strftime('%d/%m'), end.strftime('%d/%m'))
- data_per_week[week].append(run_element)
- weeks.add(week)
- return sorted(weeks), data_per_week
- def aggregate_run_per_month(run_elements):
- data_per_month = defaultdict(list)
- months = set()
- for run_element in run_elements:
- date = datetime.datetime.strptime(run_element.get('run_id')[:6],'%y%m%d')
- month = '%s month %02s (%s)' % (date.year, date.month, date.strftime("%B"))
- data_per_month[month].append(run_element)
- months.add(month)
- return sorted(months), data_per_month
- def aggregate(run_elements, sample_ids_to_info=None):
- aggregated_data = {}
- clean_bases_r1 = sum([int(e.get('clean_bases_r1', '0')) for e in run_elements if e.get('useable')=='yes'])
- clean_bases_r2 = sum([int(e.get('clean_bases_r2', '0')) for e in run_elements if e.get('useable')=='yes'])
- clean_q30_bases_r1 = sum(int(e.get('clean_q30_bases_r1', '0')) for e in run_elements if e.get('useable')=='yes')
- clean_q30_bases_r2 = sum(int(e.get('clean_q30_bases_r2', '0')) for e in run_elements if e.get('useable')=='yes')
- aggregated_data['Yield'] = '%.2f'%((clean_bases_r1 + clean_bases_r2) / 1000000000)
- aggregated_data['Yield Q30'] = '%.2f'%((clean_q30_bases_r1 + clean_q30_bases_r2) / 1000000000)
- run_ids = set([e.get('run_id') for e in run_elements if e.get('useable')=='yes' and e.get('reviewed')=='pass'])
- aggregated_data['Runs successfull'] = '%d'%(len(run_ids))
- run_ids = set([e.get('run_id') for e in run_elements if e.get('useable')=='yes' and e.get('reviewed')=='fail'])
- aggregated_data['Runs failed'] = '%d'%(len(run_ids))
- sample_ids = set([e.get('sample_id') for e in run_elements if e.get('useable')=='yes'])
- sample_ids.discard('Undetermined')
- for sample_id in sample_ids:
- new_yield = sum([e.get('clean_q30_bases_r2', '0') for e in run_elements if e.get('useable')=='yes'])
- if 'yield_q30' in sample_ids_to_info[sample_id]:
- sample_ids_to_info[sample_id]['yield_q30'] += new_yield
- else:
- sample_ids_to_info[sample_id]['yield_q30'] = new_yield
- sample_passing_threshold = new_sample_passing_threshold(sample_ids_to_info)
- if sample_ids_to_info:
- new_samples = sample_ids.difference(set(sample_ids_to_info))
- sample_ids_to_info = defaultdict(dict)
- else:
- new_samples = set(sample_ids)
- aggregated_data['Sample sequenced'] = '%d'%(len(sample_ids))
- aggregated_data['New Samples'] = '%d'%(len(new_samples))
- aggregated_data['Samples passing threshold'] = '%d'%(len(sample_passing_threshold))
- return aggregated_data, sample_ids_to_info
- def aggregate_per_time(run_elements, interval_type=INTERVAL_WEEK):
- if interval_type == INTERVAL_WEEK:
- list_intervals, data_per_interval = aggregate_run_per_week(run_elements)
- elif interval_type == INTERVAL_MONTH:
- list_intervals, data_per_interval = aggregate_run_per_month(run_elements)
- headers = [interval_type, 'Yield', 'Yield Q30', 'Runs successfull', 'Runs failed', 'Sample sequenced', 'New Samples',
- 'Samples passing threshold']
- sample_ids_to_info = defaultdict(dict)
- aggregate_per_interval = {}
- for interval in sorted(list_intervals, reverse=False)[:6]:
- out=[interval]
- aggregated_data, sample_ids_to_info = aggregate(data_per_interval.get(interval), sample_ids_to_info)
- aggregate_per_interval[interval]=aggregated_data
- print('\t'.join(headers))
- for interval in sorted(aggregate_per_interval, reverse=True):
- out=[interval]
- aggregated_data = aggregate_per_interval.get(interval)
- for header in headers[1:]:
- out.append(aggregated_data.get(header))
- print('\t'.join(out))
- def get_run_qc(interval_type):
- query = """run_elements?max_results=100"""
- extra = """"""
- run_elements = depaginate(apiurl, query, extra)
- aggregate_per_time(run_elements, interval_type)
- def main():
- p = argparse.ArgumentParser()
- p.add_argument('--interval', type=str, default=INTERVAL_WEEK,
- help="Set the interval in which the metrics will be calculated (" + INTERVAL_WEEK +' or '+ INTERVAL_MONTH + ")")
- args = p.parse_args()
- get_run_qc(args.interval)
- if __name__=="__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement