Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import gzip, zipfile, bz2, io
- def read_compressed(filename):
- """
- Opens the file in read mode with appropriate decompression algorithm.
- """
- # Standard header bytes for diff compression formats
- comp_bytes = {
- b"\x1f\x8b\x08": "gz",
- b"\x42\x5a\x68": "bz2",
- b"\x50\x4b\x03\x04": "zip"
- }
- max_len = max(len(x) for x in comp_bytes)
- def file_type(filename):
- """
- Compare header bytes with those in the file and return type.
- """
- with open(filename, 'rb') as f:
- file_start = f.read(max_len)
- for magic, filetype in comp_bytes.items():
- if file_start.startswith(magic):
- return filetype
- return "uncompressed"
- # Open file with appropriate function
- comp = file_type(filename)
- if comp == 'gz':
- return gzip.open(filename, 'rt')
- elif comp == 'bz2':
- return bz2.BZ2File(filename, 'rt')
- elif comp == 'zip':
- zip_arch = zipfile.ZipFile(filename, 'r')
- if len(zip_arch.namelist()) > 1:
- raise IOError("Only a single fastq file must be in the zip archive.")
- else:
- # ZipFile open as bytes by default, using io to read as text
- zip_content = zip_arch.open(zip_arch.namelist()[0], 'r')
- return io.TextIOWrapper(zip_content)
- else :
- return open(filename, 'r')
Add Comment
Please, Sign In to add comment