Advertisement
Guest User

Untitled

a guest
Nov 29th, 2015
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.02 KB | None | 0 0
  1. from itertools import takewhile, cycle
  2. MAX_MATCH_LENGTH = 18
  3. MAX_LOOKBEHIND = 1 << 12
  4.  
  5. def length_of_match(data, x, y, limit):
  6. """The number of matching consecutive characters starting at `x` and `y` in `data`."""
  7. return len(list(takewhile(
  8. lambda x: x[0] == x[1],
  9. zip(data[x:x+limit], data[y:y+limit])
  10. )))
  11.  
  12. def search(data, start, position, limit):
  13. """Look for the best previous match in `data` for the data at `position`.
  14. Matches are ranked by greatest length, then by shortest distance away
  15. (i.e., by greatest offset to the match position)."""
  16. candidates = range(
  17. max(start, position - MAX_LOOKBEHIND),
  18. position
  19. )
  20. return max(
  21. (length_of_match(data, previous, position, limit), previous)
  22. for previous in candidates
  23. ) if candidates else (0, 0)
  24.  
  25. def compress_raw(data, start, size):
  26. """Generator yielding blocks of either a single byte to output literally
  27. when decompressing, or a two-byte code for compression."""
  28. position = start
  29. while position < start + size:
  30. limit = min(MAX_MATCH_LENGTH, start + size - position)
  31. match_length, match_location = search(data, start, position, limit)
  32. if (match_length > 2):
  33. encoded_distance = position - match_location - 1
  34. encoded_length = match_length - 3
  35. assert encoded_length & 0xF == encoded_length
  36. assert encoded_distance & 0xFFF == encoded_distance
  37. yield bytes([
  38. ((encoded_length << 4) + ((encoded_distance >> 8) & 0xF)),
  39. encoded_distance & 0xFF
  40. ])
  41. position += match_length
  42. else:
  43. yield data[position:position+1]
  44. position += 1
  45. assert position == start + size
  46.  
  47. def compress_gen(data, start, size):
  48. """Generator yielding the compressed data, in multiple `bytes` objects."""
  49. # Header
  50. yield bytes([0x10, size & 0xff, (size >> 8) & 0xff, (size >> 16) & 0xff])
  51. output_size = 0
  52. flags = 0
  53. chunk = b''
  54. # Group raw items into chunks of eight preceded by control bytes.
  55. # In the control byte, bits are set according to whether the corresponding
  56. # item is a compression code or a literal byte.
  57. for flag, item in zip(cycle((128, 64, 32, 16, 8, 4, 2, 1)), compress_raw(data, start, size)):
  58. chunk += item
  59. if len(item) == 2: flags |= flag
  60. if flag == 1: # finished a chunk; output control byte and chunk
  61. yield bytes([flags])
  62. yield chunk
  63. output_size += 1 + len(chunk)
  64. flags = 0
  65. chunk = b''
  66. # Emit any leftovers.
  67. if chunk:
  68. yield bytes([flags])
  69. yield chunk
  70. output_size += 1 + len(chunk)
  71. # Handle padding.
  72. yield bytes(-output_size % 4)
  73.  
  74. def compress(data, start, size):
  75. return b''.join(compress_gen(data, start, size))
  76.  
  77. def decompress(data, start):
  78. # Determine how much decompressed data to expect.
  79. header = data[start:start+4]
  80. assert header[0] == 0x10
  81. size = (header[3] << 16) | (header[2] << 8) | header[1]
  82. result = bytearray(b'')
  83. position = start + 4
  84. # Main loop.
  85. flags = []
  86. while len(result) < size:
  87. if not flags:
  88. flag_byte = data[position]
  89. position += 1
  90. flags = [bool((flag_byte << i) & 0x80) for i in range(8)]
  91. # Check the next flag and handle it accordingly.
  92. flag, *flags = flags
  93. if flag:
  94. # Interpret a compression code.
  95. first, second = data[position:position+2]
  96. position += 2
  97. match_length = (first >> 4) + 3
  98. encoded_distance = (first & 0xF) << 8 | second
  99. match_location = len(result) - encoded_distance - 1
  100. # Doing indexing math here in order to be able to handle
  101. # the 'wrap-around' behaviour elegantly.
  102. for i in range(match_length):
  103. result.append(result[match_location + i])
  104. else:
  105. # Interpret a literal byte.
  106. result.append(data[position])
  107. position += 1
  108. assert len(result) == size
  109. # Position may be less than len(data) due to padding.
  110. # In general, we won't know how much data to read anyway.
  111. return result
  112.  
  113. def test(source, compressed, verify):
  114. """Full demo of functionality."""
  115. junk = b'SPAMSPAMSPAM'
  116. junk_size = len(junk)
  117. chunk_size = 169
  118. with open(source, 'rb') as f:
  119. data = f.read()
  120. with open(compressed, 'wb') as f:
  121. # Compress the file in two chunks by two methods,
  122. # and put some filler before them.
  123. f.write(junk)
  124. first = compress(data, 0, chunk_size)
  125. f.write(first)
  126. f.write(junk)
  127. for item in compress_gen(data, chunk_size, len(data) - chunk_size):
  128. f.write(item)
  129. with open(compressed, 'rb') as f:
  130. data = f.read()
  131. with open(verify, 'wb') as f:
  132. # Restore the original data, by skipping the filler and
  133. # decompressing both chunks.
  134. f.write(decompress(data, junk_size))
  135. f.write(decompress(data, 2 * junk_size + len(first)))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement