Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from itertools import takewhile, cycle
- MAX_MATCH_LENGTH = 18
- MAX_LOOKBEHIND = 1 << 12
- def length_of_match(data, x, y, limit):
- """The number of matching consecutive characters starting at `x` and `y` in `data`."""
- return len(list(takewhile(
- lambda x: x[0] == x[1],
- zip(data[x:x+limit], data[y:y+limit])
- )))
- def search(data, start, position, limit):
- """Look for the best previous match in `data` for the data at `position`.
- Matches are ranked by greatest length, then by shortest distance away
- (i.e., by greatest offset to the match position)."""
- candidates = range(
- max(start, position - MAX_LOOKBEHIND),
- position
- )
- return max(
- (length_of_match(data, previous, position, limit), previous)
- for previous in candidates
- ) if candidates else (0, 0)
- def compress_raw(data, start, size):
- """Generator yielding blocks of either a single byte to output literally
- when decompressing, or a two-byte code for compression."""
- position = start
- while position < start + size:
- limit = min(MAX_MATCH_LENGTH, start + size - position)
- match_length, match_location = search(data, start, position, limit)
- if (match_length > 2):
- encoded_distance = position - match_location - 1
- encoded_length = match_length - 3
- assert encoded_length & 0xF == encoded_length
- assert encoded_distance & 0xFFF == encoded_distance
- yield bytes([
- ((encoded_length << 4) + ((encoded_distance >> 8) & 0xF)),
- encoded_distance & 0xFF
- ])
- position += match_length
- else:
- yield data[position:position+1]
- position += 1
- assert position == start + size
- def compress_gen(data, start, size):
- """Generator yielding the compressed data, in multiple `bytes` objects."""
- # Header
- yield bytes([0x10, size & 0xff, (size >> 8) & 0xff, (size >> 16) & 0xff])
- output_size = 0
- flags = 0
- chunk = b''
- # Group raw items into chunks of eight preceded by control bytes.
- # In the control byte, bits are set according to whether the corresponding
- # item is a compression code or a literal byte.
- for flag, item in zip(cycle((128, 64, 32, 16, 8, 4, 2, 1)), compress_raw(data, start, size)):
- chunk += item
- if len(item) == 2: flags |= flag
- if flag == 1: # finished a chunk; output control byte and chunk
- yield bytes([flags])
- yield chunk
- output_size += 1 + len(chunk)
- flags = 0
- chunk = b''
- # Emit any leftovers.
- if chunk:
- yield bytes([flags])
- yield chunk
- output_size += 1 + len(chunk)
- # Handle padding.
- yield bytes(-output_size % 4)
- def compress(data, start, size):
- return b''.join(compress_gen(data, start, size))
- def decompress(data, start):
- # Determine how much decompressed data to expect.
- header = data[start:start+4]
- assert header[0] == 0x10
- size = (header[3] << 16) | (header[2] << 8) | header[1]
- result = bytearray(b'')
- position = start + 4
- # Main loop.
- flags = []
- while len(result) < size:
- if not flags:
- flag_byte = data[position]
- position += 1
- flags = [bool((flag_byte << i) & 0x80) for i in range(8)]
- # Check the next flag and handle it accordingly.
- flag, *flags = flags
- if flag:
- # Interpret a compression code.
- first, second = data[position:position+2]
- position += 2
- match_length = (first >> 4) + 3
- encoded_distance = (first & 0xF) << 8 | second
- match_location = len(result) - encoded_distance - 1
- # Doing indexing math here in order to be able to handle
- # the 'wrap-around' behaviour elegantly.
- for i in range(match_length):
- result.append(result[match_location + i])
- else:
- # Interpret a literal byte.
- result.append(data[position])
- position += 1
- assert len(result) == size
- # Position may be less than len(data) due to padding.
- # In general, we won't know how much data to read anyway.
- return result
- def test(source, compressed, verify):
- """Full demo of functionality."""
- junk = b'SPAMSPAMSPAM'
- junk_size = len(junk)
- chunk_size = 169
- with open(source, 'rb') as f:
- data = f.read()
- with open(compressed, 'wb') as f:
- # Compress the file in two chunks by two methods,
- # and put some filler before them.
- f.write(junk)
- first = compress(data, 0, chunk_size)
- f.write(first)
- f.write(junk)
- for item in compress_gen(data, chunk_size, len(data) - chunk_size):
- f.write(item)
- with open(compressed, 'rb') as f:
- data = f.read()
- with open(verify, 'wb') as f:
- # Restore the original data, by skipping the filler and
- # decompressing both chunks.
- f.write(decompress(data, junk_size))
- f.write(decompress(data, 2 * junk_size + len(first)))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement