Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import os
- import re
- import argparse as ap
- class Container:
- def __init__(self):
- self.members = []
- def add_member(self, line):
- self.members.append(line)
- def get_last(self):
- return self.members[-1]
- def dump(self):
- rep = [m.dump() if isinstance(m, Container) else m for m in self.members]
- return "\n".join(rep)
- class MSScan(Container):
- def __init__(self):
- super(MSScan, self).__init__()
- self.scan_num = None
- self.filter_line = None
- def finalize(self):
- self.scan_num = int(re.search('(?<=num\=\")[0-9]+', self.members[0]).group(0))
- self.filter_line = re.search('(?<=filterLine\=\").+(?=\")', self.members[5]).group(0)
- class MSRun(Container):
- def __init__(self):
- super(MSRun, self).__init__()
- self.scan_count = 0
- self.start_time = 0
- self.end_time = 0
- def count_scans(self, match):
- self.scan_count += 1
- return str(self.scan_count)
- def sort(self):
- key_func = lambda m: (m.filter_line, m.scan_num) if isinstance(m, MSScan) else ()
- tail = self.members[-1:]
- self.members = sorted(self.members[:-1], key=key_func)
- self.members += tail
- def dump(self):
- rep = super(MSRun, self).dump()
- rep = re.sub('(?<=scan num\=\")[0-9]+', self.count_scans, rep)
- rep = re.sub('(?<=scanCount\=\")[0-9]+', str(self.scan_count), rep)
- return rep
- class MSFile(Container):
- def __init__(self):
- super(MSFile, self).__init__()
- def make_index(self):
- scan_search = re.compile("\<scan")
- self.members.append(' <index name="scan" >')
- for ind, match in enumerate(scan_search.finditer(self.members[0])):
- to_add = ' <offset id="{index}" >{off}</offset>'.format(index=ind+1, off=match.start(0))
- self.members.append(to_add)
- self.members.append(' </index>')
- self.members.append(' <indexOffset>{}</indexOffset>'.format(len(self.members[0]) + 2)) #+2 for newline and space
- def dump(self):
- self.members = [super(MSFile, self).dump()]
- self.make_index()
- self.members.append("</mzXML>\n")
- return '\n'.join(self.members)
- class FileParser:
- def __init__(self, split_criteria, transfer_ms2, sort_ms1, output_right):
- self.split_criteria = re.compile(split_criteria)
- self.transfer_ms2 = transfer_ms2
- self.sort_ms1 = sort_ms1
- self.output_right = output_right
- self.left_file = MSFile()
- self.right_file = MSFile()
- self.buff = None
- self.mz_file = None
- # Search helpers
- self.isRunStart = re.compile('\<msRun')
- self.isRunEnd = re.compile('\<\/msRun\>')
- self.isScanStart = re.compile('\<scan')
- self.isScanEnd = re.compile('\<\/scan\>')
- self.isIndexStart = re.compile('\<index')
- self.isIndexEnd = re.compile('\<\/indexOffset\>')
- def parse_scan(self):
- scan = MSScan()
- scan.add_member(self.buff)
- self.buff = self.mz_file.pop()
- while self.mz_file:
- if self.isScanStart.search(self.buff):
- self.buff = self.parse_scan()
- scan.add_member(self.buff)
- elif self.isScanEnd.search(self.buff):
- scan.add_member(self.buff)
- scan.finalize()
- break
- else:
- scan.add_member(self.buff)
- self.buff = self.mz_file.pop()
- return scan
- def parse_ms_run(self):
- # This is where criteria comes in.
- # Right now, I think the best idea is
- # to build to run containers and then
- # append them at the end.
- left_run = MSRun()
- right_run = MSRun()
- while self.mz_file:
- if self.isScanStart.search(self.buff):
- self.buff = self.parse_scan()
- # if scan meets criteria, send to left scan
- # else, take all ms2 from scan and give it
- # to the last scan in left
- if self.split_criteria.search(self.buff.filter_line):
- left_run.add_member(self.buff)
- else:
- if self.transfer_ms2:
- [left_run.get_last().add_member(m) if isinstance(m, MSScan) else None
- for m in self.buff.members]
- right_run.add_member(self.buff)
- elif self.isRunEnd.search(self.buff):
- left_run.add_member(self.buff)
- right_run.add_member(self.buff)
- break
- else:
- left_run.add_member(self.buff)
- right_run.add_member(self.buff)
- self.buff = self.mz_file.pop()
- if self.sort_ms1:
- left_run.sort()
- right_run.sort()
- self.left_file.add_member(left_run)
- self.right_file.add_member(right_run)
- def make_index(self):
- while self.mz_file:
- if self.isIndexEnd.search(self.buff):
- break
- self.buff = self.mz_file.pop()
- def parse_file(self, file_name):
- with open(file_name, mode='rb') as source:
- self.mz_file = source.read().decode('ISO-8859-1').split('\n')
- self.mz_file.reverse()
- self.buff = self.mz_file.pop()
- while self.mz_file:
- if self.isRunStart.search(self.buff):
- self.parse_ms_run()
- break # Ignore everything after </msRun>
- else:
- self.left_file.add_member(self.buff)
- self.right_file.add_member(self.buff)
- self.buff = self.mz_file.pop()
- def write_files(self, path, left_name, right_name):
- with open(os.path.join(path, left_name), mode='wb') as dest:
- dest.write(self.left_file.dump().encode('ISO-8859-1'))
- if self.output_right:
- with open(os.path.join(path, right_name), mode='wb') as dest:
- dest.write(self.right_file.dump().encode('ISO-8859-1'))
- if __name__ == "__main__":
- arg_parser = ap.ArgumentParser(prog="mzXMLparser",
- usage="""
- This program is meant for the task of spliting mzXML files based on the MS1 filter line into 2 child files,
- named left and right. These prefixes can be changed ith the -p1 or -p2 command. The script can also sort MS1s.
- python mzxml_splitter.py [args] file1 file2 ...
- """
- )
- arg_parser.add_argument("-t", "--transfer", action="store_true",
- help="Whether to transfer ms2 from non-matching MS1 into matching MS1")
- arg_parser.add_argument("-s", "--sort", action="store_true",
- help="Whether to sort the MS1 scans once they are split (sorts both files). Sorting is on filterLine first, then original scan number.")
- arg_parser.add_argument("-or", "--output_right", action="store_true",
- help="Whether to output a mzXML file with MS1s that don't match your REGEX")
- arg_parser.add_argument("-r", "--regex", default='',
- help="REGEX to split MS1 scans on. Searches the filterLine attribute of MS1 scans.")
- arg_parser.add_argument("-p1", "--prefix1", default='left',
- help="Prefix to add to file of scans which MATCH specified criteria")
- arg_parser.add_argument("-p2", "--prefix2", default='right',
- help="Prefix to add to file of scans which DON'T MATCH specified criteria")
- arg_parser.add_argument("-d", "--directory", default='./',
- help="Output directory for final files")
- arg_parser.add_argument('files', nargs='+')
- args = arg_parser.parse_args()
- for f in args.files:
- file_parser = FileParser(args.regex, args.transfer, args.sort, args.output_right)
- file_parser.parse_file(f)
- base_name = os.path.split(f)[1]
- file_parser.write_files(args.directory,
- args.prefix1 + "_" + base_name,
- args.prefix2 + "_" + base_name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement