Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import with_statement
- from heapq import heapify, heappop, heappushpop
- from contextlib import closing
- from itertools import count
- from mmap import mmap
- try:
- from collections import Counter
- except:
- def Counter(iterable):
- frequencies = __import__('collections').defaultdict(int)
- for item in iterable:
- frequencies[item] += 1
- return frequencies
- class BitWriter(object):
- """Accepts bitstrings and writes to a file-like object"""
- def __init__(self, bitfile, codes = None):
- self.bitfile, self.remainder, self.codes = bitfile, '', codes
- @staticmethod
- def tobytes(bits):
- return (chr(int(bits[i:i+8], 2)) for i in range(0, len(bits), 8))
- def write(self, bits):
- bits, self.remainder = self.remainder + bits, ''
- bytes = len(bits) // 8
- if not bytes:
- self.remainder = bits
- return
- fullbits = bytes * 8
- self.remainder, bits = bits[fullbits:], bits[:fullbits]
- self.bitfile.write(''.join(self.tobytes(bits)))
- def bitpad(self):
- """Smart bitpadder - Uses a sequence that isn't a valid code to bitpad
- This is an alternative to a prepended bit signifying padding
- It doesn't require the reader know anything about the bitpadding"""
- tail, pad = -len(self.remainder) % 8, ''
- if self.codes:
- for pad in (bin(i)[2:].zfill(tail) for i in range(2**tail)):
- if all(not pad.startswith(bitcode) for bitcode in self.codes()):
- break
- if tail and not pad:
- print "Possible code collision with zero padding"
- self.write(pad or '0' * tail)
- class BitReader(object):
- """Reads from a file-like object and returns bitstrings"""
- def __init__(self, bitfile):
- self.bitfile, self.remainder = bitfile, ''
- @staticmethod
- def tobits(bytestring):
- for c in bytestring:
- yield bin(ord(c))[2:].zfill(8)
- def __iter__(self):
- return iter(lambda: self.read(1), '')
- def read(self, bits=None):
- remainder = self.remainder
- if bits is None:
- self.remainder, data = '', remainder
- return data + ''.join(self.tobits(self.infile.read()))
- rlen = len(remainder)
- if bits <= rlen:
- self.remainder, data = remainder[bits:], remainder[:bits]
- return data
- bits -= rlen
- numbytes, extra = divmod(bits, 8)
- data = ''.join(self.tobits(self.bitfile.read(numbytes + (not not extra))))
- self.remainder, data = data[bits:], remainder + data[:bits]
- return data
- class BitFileReader(BitReader):
- """BitReader that opens and closes its file"""
- def __init__(self, filename):
- self.filename, self.remainder = filename, ''
- def open(self):
- self.bitfile = open(self.filename, 'rb')
- return self
- def close(self, *args):
- self.bitfile.close()
- __enter__ = open
- __exit__ = close
- class BitFileWriter(BitWriter):
- """BitWriter that opens and closes its file and bitpads itself"""
- def __init__(self, filename, codes = None):
- self.filename, self.remainder, self.codes = filename, '', codes
- def open(self):
- self.bitfile = open(self.filename, 'wb')
- return self
- def close(self, *args):
- self.bitpad()
- self.bitfile.close()
- __enter__ = open
- __exit__ = close
- def codetree(node):
- """Codes a node, and calls itself on the node's children"""
- for i, child in enumerate(node[1:3]):
- child[3] = node[3] + str(i)
- if isinstance(child[1], list):
- codetree(child)
- def codedata(indata):
- """Creates a mapping of byte to code by
- 1. generating a heap of nodes
- 2. turning them into a tree by frequency
- 3. recursively coding the tree
- """
- index = count().next
- heap = [[freq, index(), byte, ''] for byte, freq in Counter(indata).iteritems()]
- heapify(heap)
- tempheap = heap[:]
- head = heappop(tempheap)
- while tempheap:
- temp = heappop(tempheap)
- head = heappushpop(tempheap, [head[0] + temp[0], head, temp, ''])
- codetree(head)
- return Codec(node[2:] for node in heap)
- def codefile(filename):
- """Open a file, mmap it, and code it"""
- with open(filename, 'r+b') as infile:
- with closing(mmap(infile.fileno(), 0)) as indata:
- return codedata(indata)
- def loadcodetable(codetablefilename):
- """Load a table from a file
- Correctly interprets newlines and nul bytes
- CSV Can't do nul bytes and shelve is awkward and overkill"""
- table, byte = Codec(), False
- with open(codetablefilename, 'rb') as codetablefile:
- for line in codetablefile:
- if not byte:
- byte, line = line[0], line[1:]
- if not line:
- continue
- table[byte] = line[:-1]
- byte = False
- return table
- def fliptable(table):
- """Flip a table, works as expected if the values are unique"""
- table.update([(value, key) for key, value in (table.popitem() for Null in range(len(table)))])
- return table
- def keysarebytes(table):
- """Determing if a table is a bytetable or codetable"""
- return all(len(key) == 1 for key in table)
- def dumptable(table, tablefilename):
- """Dump a table to a file
- Uses a custon one item per line format"""
- outtable = table if keysarebytes(table) else fliptable(table)
- with open(tablefilename, 'wb') as tablefile:
- for byte, code in outtable.iteritems():
- tablefile.write(byte + code + '\n')
- return table
- def encode(bytetable, indata):
- """Encode a string-like object by looking up each byte in a table"""
- for byte in indata:
- yield bytetable[byte]
- def decode(codetable, encodeddata, code=''):
- """Decode a bitstring-like object by reading bits until a code is found
- then yielding the corresponding value and repeating"""
- for bit in encodeddata:
- code += bit
- if code in codetable:
- yield codetable[code]
- code = ''
- def writeencodefile(bytetable, infilename, encodedfilename):
- """Read unencoded data from a file and encode it to another file"""
- with BitFileWriter(encodedfilename, bytetable.itervalues) as encodedfile:
- with open(infilename, 'r+b') as infile:
- with closing(mmap(infile.fileno(), 0)) as indata:
- for code in encode(bytetable, indata):
- encodedfile.write(code)
- return bytetable
- def writedecodefile(codetable, encodedfilename, decodedfilename):
- """Decode an encoded file and save the decoded version"""
- with BitFileReader(encodedfilename) as encodeddata:
- with open(decodedfilename, 'wb') as decodedfile:
- for byte in decode(codetable, encodeddata):
- decodedfile.write(byte)
- return codetable
- class Codec(dict):
- """Given raw data, creates a mapping of bytes to codes
- Given a mapping of bytes to codes, Huffman encodes binary data
- Given a mapping of codes to bytes, decodes Huffman encoded binary data
- Includes utility functions for file and dictionary operations"""
- codedata = staticmethod(codedata)
- codefile = staticmethod(codefile)
- loadcodetable = staticmethod(loadcodetable)
- fliptable = fliptable
- keysarebytes = keysarebytes
- dumptable = dumptable
- encode = encode
- decode = decode
- writeencodefile = writeencodefile
- writedecodefile = writedecodefile
- __all__ = ['Codec', 'BitFileReader', 'BitFileWriter', 'BitReader', 'BitWriter']
- def test():
- filename = __file__
- with open(filename, 'rb') as infile:
- indata = infile.read()
- # Test Functional
- # Everything is either a generator or returns a Codec
- # (or whatever dict was passed in)
- (codefile(filename).dumptable(filename + '.codetable').
- writeencodefile(filename, filename + '.encoded').
- loadcodetable(filename + '.codetable').fliptable().
- writedecodefile(filename + '.encoded', filename + '.decoded'))
- with open(filename + '.decoded', 'rb') as outfile:
- assert indata == outfile.read()
- # Test Procedural
- # Everything works fine called unbound
- # They'll all take regular dicts too
- writeencodefile(dumptable(codefile(filename), filename + '.codetable'),
- filename, filename + '.encoded')
- writedecodefile(fliptable(loadcodetable(filename + '.codetable')),
- filename + '.encoded', filename + '.decoded')
- with open(filename + '.decoded', 'rb') as outfile:
- assert indata == outfile.read()
- # Test OO
- # You can also call Codec(byte_or_codetable) with a normal dict
- codec = Codec.codefile(filename)
- codec.writeencodefile(filename, filename + '.encoded')
- codec.dumptable(filename + '.codetable')
- codec = Codec.loadcodetable(filename + '.codetable')
- codec.fliptable()
- codec.writedecodefile(filename + '.encoded', filename + '.decoded')
- with open(filename + '.decoded', 'rb') as outfile:
- assert indata == outfile.read()
- print filename, "successfully compressed and decompressed."
- if __name__ == '__main__':
- #__import__('cProfile').run("test()")
- test()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement