Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import glob
- import os
- class bam_pipeline:
- # workingdirectory, map_type, sample_type, library_matching_id, thrds
- def __init__(self, working_directory, map_type, sample_type, library_matching_id, thrds):
- self.working_directory = working_directory
- self.map_type = map_type
- self.sample_type = sample_type
- self.library_matching_id = library_matching_id
- self.threads = thrds
- self.logs = {'function': "", 'command': "", 'start_time': "", 'end_time': "", 'threads': self.threads,
- 'success': 0}
- self.write_logs()
- self.picard_path = "/home/bioinformaticslab/Desktop/AMBRY/picard.jar"
- self.gatk_path = "/home/bioinformaticslab/Desktop/AMBRY/GenomeAnalysisTK.jar"
- self.ref_dir = "/home/bioinformaticslab/Desktop/GenomicsWorks/ref_genome_indexes/"
- self.bundle_dir = self.ref_dir + "hg19_bundle"
- def get_fastq(self):
- os.chdir(self.working_directory)
- self.main_dir = os.getcwd()
- all_fastq_files = glob.glob("*fastq.gz")
- split_names_v = [os.path.splitext(os.path.splitext(i)[0])[0] for i in all_fastq_files]
- return split_names_v
- def get_info(self, fastq_list):
- sample_ID, germline_dna, index_seq, lanes, pairs_r, n_of_seq = (set() for i in range(6))
- if self.sample_type == "Tumor":
- for i in fastq_list:
- sample_ID.add(i.split("_")[0])
- index_seq.add(i.split("_")[1])
- lanes.add(i.split("_")[2])
- pairs_r.add(i.split("_")[3])
- n_of_seq.add(i.split("_")[4])
- list_with_info = {"Sample_ID": list(sample_ID), "Index": list(index_seq), "Lanes": list(lanes),
- "Pairs": list(pairs_r), "Number_of_seq": list(n_of_seq)}
- return list_with_info
- elif self == "Germline":
- for i in fastq_list:
- sample_ID.add(i.split("_")[0])
- germline_dna.add(i.split("_")[1])
- index_seq.add(i.split("_")[2])
- lanes.add(i.split("_")[3])
- pairs_r.add(i.split("_")[4])
- n_of_seq.add(i.split("_")[5])
- list_with_info = {"Sample_ID": list(sample_ID), "Germline": list(germline_dna), "Index": list(index_seq),
- "Lanes": list(lanes), "Pairs": list(pairs_r), "Number_of_seq": list(n_of_seq)}
- return list_with_info
- else:
- print("raise error and ask again for a valid sample type")
- def mapping(self):
- import re
- import gzip
- fastq_list = self.get_fastq()
- info_dict = self.get_info(fastq_list)
- RG_SM = info_dict["Sample_ID"][0]
- RG_PL = "Illumina"
- RG_LB = self.library_matching_id
- RG_ID = ""
- RG_PU = ""
- first_fastq_file_dir = self.working_directory + "/" + fastq_list[0] + ".fastq.gz"
- with gzip.open(first_fastq_file_dir) as f:
- first_line = f.readline()
- flowcell_info = str(first_line).split(":")[2]
- for i in info_dict["Lanes"]:
- for k in info_dict["Number_of_seq"]:
- r1 = re.compile(".*" + i + "_R1_" + k)
- read1 = [s + ".fastq.gz" for s in fastq_list if r1.match(s)]
- r2 = re.compile(".*" + i + "_R2_" + k)
- read2 = [s + ".fastq.gz" for s in fastq_list if r2.match(s)]
- RG_ID = flowcell_info + "." + i[-1]
- RG_PU = flowcell_info + "." + info_dict["Index"][0] + "." + i[-1]
- add_read_group = ""
- map_bam = ""
- gene_origin = self.map_type + "_" + info_dict["Sample_ID"][0] + "_" + info_dict["Index"][
- 0] + "_" + i + "_" + k + ".bam"
- if self.map_type == "Bwa":
- add_read_group = ' -R "@RG\\tID:' + RG_ID + '\\tSM:' + RG_SM + '\\tLB:' + RG_LB + '\\tPL:' + \
- RG_PL + '\\tPU:' + RG_PU + '" '
- map_bam = "bwa mem -t " + self.threads + " " + add_read_group + self.ref_dir + \
- "Bwa/ucsc.hg19.fasta " + read1[0] + " " + read2[0] + \
- " | samtools view -@" + self.threads + " -bS - > " + gene_origin
- elif self.map_type == "Bowtie":
- add_read_group = " --rg-id " + RG_ID + " --rg SM:" + RG_SM + " --rg LB:" + RG_LB + " --rg PL:" + \
- RG_PL + " --rg PU:" + RG_PU
- map_bam = "bowtie2 -p" + self.threads + add_read_group + " -x " + self.ref_dir + \
- "Bowtie/hg_19_bowtie2 -1 " + read1[0] + " -2 " + read2[0] + \
- " | samtools view -@" + self.threads + " -bS - > " + gene_origin
- else:
- return "Please specify the map type Bwa/Bowtie "
- self.system_command_send(map_bam, "mapping")
- self.convert_sort(gene_origin)
- def convert_sort(self, sort_gene_origin):
- convert_sort = "samtools view -@" + self.threads + " -bS " + sort_gene_origin + " | samtools sort -@" + \
- self.threads + " -o SortedBAM_" + sort_gene_origin
- self.system_command_send(convert_sort, "mapping_function;convert_sort_command")
- def merge_bams(self):
- os.chdir(self.working_directory)
- fastq_list = self.get_fastq()
- info_dict = self.get_info(fastq_list)
- all_bam_files = glob.glob("SortedBAM*")
- print(all_bam_files)
- # sorted_bam_files = [os.path.splitext(os.path.splitext(i)[0])[0] for i in all_bam_files]
- inputs_list = ""
- for i in all_bam_files:
- inputs_list = inputs_list + "I=" + i + " "
- ouput_name = self.map_type + "_" + info_dict["Sample_ID"][0] + "_MergedBAM.bam"
- merge_command = "java -XX:ParallelGCThreads=" + self.threads + \
- " -jar " + self.picard_path + " MergeSamFiles " + inputs_list + \
- " O=" + ouput_name + " USE_THREADING=true"
- self.system_command_send(merge_command, "merge_bams")
- def mark_duplicate(self):
- merged_bam = glob.glob("*_MergedBAM.bam")
- mark_prefix_removed = self.map_type + "_mdup_removed_"
- picardcommand = "java -XX:ParallelGCThreads=" + self.threads + \
- " -jar " + self.picard_path + " MarkDuplicates I=" + merged_bam[0] + \
- " O=" + mark_prefix_removed + "_" + merged_bam[0] + \
- " M=marked_dup_metrics.txt REMOVE_DUPLICATES=true CREATE_INDEX=true"
- self.system_command_send(picardcommand, "mark_duplicate")
- def gatk_RealignTargetCreator(self):
- bamstr = self.map_type + "_mdup_removed*.bam"
- print(bamstr)
- lastbam = glob.glob(bamstr)
- bcal = "java -jar " + self.gatk_path + " -T RealignerTargetCreator -nt " + \
- self.threads + " -R " + self.bundle_dir + "/ucsc.hg19.fasta -known " + \
- self.bundle_dir + "/Mills_and_1000G_gold_standard.indels.hg19.vcf -I " + lastbam[0] + \
- " -o realign_target.intervals"
- self.system_command_send(bcal, "GATK_RealignTargetCreator")
- def gatk_IndelRealigner(self):
- bamstr = self.map_type + "_mdup_removed*.bam"
- lastbam = glob.glob(bamstr)
- realigned_last_bam = "IndelRealigned_" + lastbam[0]
- bcal = "java -jar " + self.gatk_path + " -T IndelRealigner -R " + self.bundle_dir + "/ucsc.hg19.fasta -known "+\
- self.bundle_dir + "/Mills_and_1000G_gold_standard.indels.hg19.vcf" + \
- " -targetIntervals realign_target.intervals --noOriginalAlignmentTags -I " + lastbam[0] + " -o " + \
- realigned_last_bam
- self.system_command_send(bcal, "GATK_IndelRealigner")
- def gatk_BaseRecalibrator(self):
- bamstr = "IndelRealigned_*.bam"
- lastbam = glob.glob(bamstr)
- basequalityscore = str(lastbam[0]).split(".")[0] + "_bqsr.grp"
- bcal = "java -jar " + self.gatk_path + " -T BaseRecalibrator -R " + self.bundle_dir + "/ucsc.hg19.fasta -I " + \
- lastbam[0] + " -knownSites " + self.bundle_dir + "/Mills_and_1000G_gold_standard.indels.hg19.vcf" + \
- " -o " + basequalityscore
- self.system_command_send(bcal, "GATK_BaseRecalibrator")
- def gatk_PrintReads(self):
- bamstr = "IndelRealigned_*.bam"
- lastbam = glob.glob(bamstr)
- bqsr = glob.glob("*.grp")[0]
- aftercalibratorBam = "Completeted_BaseCalibrator_" + lastbam[0]
- bcal = "java -jar " + self.gatk_path + " -T PrintReads -R " + self.bundle_dir + "/ucsc.hg19.fasta -I " +\
- lastbam[0] + " --BQSR " + bqsr, " -o ", aftercalibratorBam
- self.system_command_send(bcal, "GATK_PrintReads")
- def run_gatks(self):
- self.gatk_RealignTargetCreator()
- self.gatk_IndelRealigner()
- self.gatk_BaseRecalibrator()
- self.gatk_PrintReads()
- def run_pipeline(self):
- fastqs = self.get_fastq()
- info = self.get_info(fastqs)
- self.mapping()
- self.merge_bams()
- self.mark_duplicate()
- self.run_gatks()
- self.create_folder()
- def system_command_send(self, command, from_function):
- from datetime import datetime
- self.logs["function"] = from_function
- self.logs["command"] = command
- self.logs["start_time"] = str(datetime.now())
- self.logs["threads"] = self.threads
- try:
- os.system(command)
- self.logs["end_time"] = str(datetime.now())
- self.logs["success"] = 1
- self.write_logs()
- self.clean_log()
- except:
- self.logs["end_time"] = str(datetime.now())
- self.logs["success"] = 0
- self.write_logs()
- self.clean_log()
- return from_function + " give error with this command -> " + command
- def write_logs(self):
- import json
- with open('log_file.txt', 'a') as file:
- file.write(json.dumps(self.logs))
- file.write(",")
- def clean_log(self):
- self.logs = {'function': "", 'command': "", 'start_time': "", 'end_time': "", 'threads': self.threads,
- 'success': 0}
- def create_folder(self):
- import shutil
- os.chdir(self.working_directory)
- mk_dir = self.working_directory + "/" + self.map_type
- os.mkdir(mk_dir)
- all_files = glob.glob("*.*")
- for file in all_files:
- if file[-2:] != "gz":
- print(file)
- shutil.move(self.working_directory+"/"+file, mk_dir+"/"+file)
- if __name__ == "__main__":
- pipeline1 = bam_pipeline("/home/bioinformaticslab/Desktop/AMBRY/Sample_NB17", "Bwa", "Tumor", "306", "7")
- pipeline1.run_pipeline()
- # print(asd)
Add Comment
Please, Sign In to add comment