Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from Bio.Seq import Seq
- from Bio.Seq import reverse_complement
- dna_bases = ('A', 'C', 'G', 'T')
- purines = ('A', 'G')
- pyrimidines = ('C', 'T')
- transitions = ('A>T', 'C>G', 'G>C', 'T>A')
- transversions = ('A>C', 'A>G', 'C>A', 'C>T', 'G>A', 'G>T', 'T>C', 'T>G')
- class Mutation(object):
- def __init__(self, ref, base, chrom, position, context):
- self.ref = ref
- self.base = base
- self.chrom = chrom
- self.position = int(position)
- self.context = context
- @property
- def substitution(self):
- return Seq(''.join([self.ref, '>', self.base]))
- @property
- def context(self):
- return Seq(''.join([self._5prime, self.ref, self._3prime]))
- @context.setter
- def context(self, context):
- self._5prime = context[:len(context) - 2]
- self._3prime = context[len(context) - 1:]
- @property
- def label(self):
- return ''.join([self._5prime.upper(),
- '[',
- str(self.substitution).upper(),
- ']',
- self._3prime.upper()])
- def __eq__(self, other):
- return ''.join([str(self.chrom),
- str(self.position),
- str(self.substitution),
- str(self.context)]) == other
- def __hash__(self):
- return hash(''.join([str(self.chrom),
- str(self.position),
- str(self.substitution),
- str(self.context)]))
- def __lt__(self, other):
- return self.position < other.position
- def __repr__(self):
- return 'Mutation: {} in {}\nPosition: {}:{:,}\n'.format(
- str(self.substitution),
- str(self.context),
- self.chrom,
- self.position)
- def mutpos_depths(infile, chromosome=None, start=0, end=None, fmt='essigmann'):
- depths = {}
- with open(infile, 'r') as handle:
- for line in handle:
- # Strip newline characters and split on tabs
- line = line.strip().split('\t')
- # Unpack line and cast to proper data type
- chrom = str(line[0])
- position, depth = int(line[2]) - 1, int(line[3])
- if chromosome is not None and chrom != chromosome:
- continue
- if position < start:
- continue
- if end is not None and position >= end:
- continue
- if fmt is 'essigmann':
- A, C, G, T, N = map(int, line[4:9])
- elif fmt is 'loeb':
- T, C, G, A = map(int, line[5:9])
- N = int(line[11])
- else:
- raise ValueError('Format must be essigmann or loeb')
- depths[position] = depth - N
- result = []
- for position in range(start, end):
- if position in depths.keys():
- result.append(depths[position])
- else:
- result.append(0)
- return result
- def fasta_to_dict(ref_file):
- from Bio import SeqIO
- with open(ref_file, 'r') as handle:
- return SeqIO.to_dict(SeqIO.parse(handle, 'fasta'))
- def from_mutpos(mutpos_file, ref_file, clonality=(0, 1), min_depth=100, kmer=3,
- chromosome=None, start=0, end=None, notation='pyrimidine',
- fmt='essigmann', verbose=True):
- mutations = []
- record_dict = fasta_to_dict(ref_file)
- # Open the mutpos_file and read data line by line
- with open(mutpos_file, 'r') as handle:
- for line in handle:
- # Strip newline characters and split on tabs
- line = line.strip().split('\t')
- # Unpack line and cast to proper data type
- chrom, ref = str(line[0]), str(line[1]).upper()
- position, depth = int(line[2]) - 1, int(line[3])
- if chromosome is not None and chrom != chromosome:
- continue
- if position < start:
- continue
- if end is not None and position >= end:
- continue
- if fmt is 'essigmann':
- A, C, G, T, N = map(int, line[4:9])
- elif fmt is 'loeb':
- T, C, G, A = map(int, line[5:9])
- N = int(line[11])
- else:
- raise ValueError('Format must be essigmann or loeb')
- # If we see no observed base substitutions, skip this loop
- if sum([A, C, G, T]) == 0:
- continue
- # Read mapped depth (minus Ns) at this position must be greater
- # than min_depth, if not, skip this loop
- if min_depth > depth:
- continue
- # Get the kmer context at the given position in the given
- # chromosome as looked up in record_dict. If we encounter an
- # edge case (return of None) we'll skip this loop
- context = str(get_kmer(record_dict, chrom, position, kmer)).upper()
- if context is None:
- continue
- # If the base is not in our intended labeling set: let's get
- # the complement of the reference base, the reverse complement
- # of the trinucleotide context, and the mutation counts for the
- # complement of the base substitution observed
- if (
- (notation == 'pyrimidine' and ref in purines) or
- (notation == 'purine' and ref in pyrimidines)
- ):
- ref = reverse_complement(ref)
- context = reverse_complement(context)
- A, G, C, T = T, C, G, A
- for base, num_mutations in zip(dna_bases, [A, C, G, T]):
- # Clonality is defined as the frequency of any base
- # substitution at this one genomic position. For example, if
- # there are 5% T, 10% C, and 100% G in a reference position of
- # A, we can selet a clonality filter of (0.1, 0.5) to
- # eliminate the rare T mutations and clonal G mutations.
- base_clonality = num_mutations / (depth - N)
- if not min(clonality) < base_clonality < max(clonality):
- continue
- for _ in range(num_mutations):
- mutation = Mutation(ref, base, chrom, position, context)
- mutation.depth = depth
- mutation.observed = num_mutations
- mutation.VAF = base_clonality
- mutations.append(mutation)
- if verbose is True:
- print('Found {} Mutations'.format(len(mutations)))
- return mutations
- def read_mutations(infile, gene_filter=None):
- mutations = []
- with open(infile) as handle:
- for line in handle:
- sample, chromosome, position, gene, substitution, \
- context, observed, depth, VAF = line.strip().split(',')
- position = int(position)
- if (gene_filter is not None and
- gene != gene_filter.name):
- continue
- if (gene_filter is not None and
- position < gene_filter.start):
- continue
- if (gene_filter is not None and
- position > gene_filter.end):
- continue
- mutation = Mutation(substitution[0],
- substitution[2],
- chromosome,
- position,
- context)
- mutation.VAF = float(VAF)
- mutation.sample, mutation.gene = sample, gene
- mutation.depth, mutation.observed = int(depth), int(observed)
- mutations.append(mutation)
- return mutations
Add Comment
Please, Sign In to add comment