Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- """
- A fairly straight port of Haruhiko Okumura's LZSS codec from C.
- Nothing really clever, and not very pythonic. (I'm not that good at this honestly.)
- """
- # Like its C equiv., has problems when idx isn't what it would like. Tied to node init, and instead of attempting to fix this came up with an equally problematic ringless-ring encoder.
- def encode(data, ring, limit, threshold, idx, fill, byteorder, condition=True):
- """
- Compresses [data], returning a bytes object.
- [ring] may be any size between 0x4000 and 0x100.
- [limit] set the longest length for a copy; proper values depend on ring and threshold.
- [threshold] is the shortest acceptable copy, minus one.
- [idx] is typically ring - limit but may differ on application.
- [fill] is used to initialize the ring.
- *) For text applications use a space (b' ', or ASCII code 0x20)
- *) Binary applications use a zero (b'\\0' or 0)
- [byteorder] sets the byteorder copy commands are written in.
- Usually this is 'little', but 'big' is also acceptable.
- [condition] sets the flag state used for literals and dictionary hits.
- """
- output = bytearray()
- if not data: return output
- from itertools import repeat
- # Initialize the ring buffer with a common fill value.
- if isinstance(fill, (bytes, bytearray)):
- fill = fill[0]
- rng = bytearray(repeat(fill, ring + limit))
- matchlen, matchpos, nil = 0, 0, ring
- # Initialize the trees.
- lson= list(bytes(ring + 1))
- rson= list(bytes(ring + 1))
- rson.extend(repeat(nil, 256))
- dad = list(repeat(nil, ring + 1))
- # Create the masks for encoding values.
- # This assumes minimum ring of 0x100, maximum length of 0x100 + threashold + 1.
- r_mask = ring - 1
- b_shft = r_mask.bit_length() - 8
- b_mask = (0xFF00 >> b_shft) & 0xFF
- condition = 0x80 if condition else 0
- noncondition = condition ^ 0x80
- def InsertNode(v):
- """Inserts string into trees and returns
- longest match position and length as a tuple.
- If match length equals the limit, it replaces the old node.
- By embedding this function it can use the trees and ring globally."""
- cmp, ml, mp = 1, 0, matchpos
- key = rng[v:]
- pos = ring + 1 + key[0]
- rson[v] = nil
- lson[v] = nil
- while True:
- if cmp>=0:
- if rson[pos] != nil:
- pos = rson[pos]
- else:
- rson[pos] = v
- dad[v] = pos
- return (ml, mp)
- else:
- if lson[pos] != nil:
- pos = lson[pos]
- else:
- lson[pos] = v
- dad[v] = pos
- return (ml, mp)
- for i in range(1, limit+1):
- cmp = key[i] - rng[pos + i]
- if cmp:
- break
- if i > ml:
- mp, ml = pos, i
- if ml >= limit: break
- dad[v] = dad[pos]
- lson[v]=lson[pos]
- rson[v]=rson[pos]
- dad[lson[pos]] = v
- dad[rson[pos]] = v
- if rson[dad[pos]] == pos:
- rson[dad[pos]] = v
- else:
- lson[dad[pos]] = v
- dad[pos] = nil
- return (ml, mp)
- def DeleteNode(pos):
- if dad[pos] == nil: return
- # If it's in the tree, delete it.
- q = lson[pos]
- if lson[pos] == nil:
- q = rson[pos]
- elif rson[pos] != nil:
- if rson[q] != nil:
- while True:
- q = rson[q]
- if rson[q] == nil: break
- rson[dad[q]] = lson[q]
- dad[lson[q]] = dad[q]
- lson[q] = lson[pos]
- dad[lson[pos]] = q
- rson[q] = rson[pos]
- dad[rson[pos]] = q
- dad[q] = dad[pos]
- if rson[dad[pos]] == pos:
- rson[dad[pos]] = q
- else:
- lson[dad[pos]] = q
- dad[pos] = nil
- # Unset flags on copies; send when less than 256.
- mask = 0xFF00
- codebuf = bytearray()
- s = 0
- # Read limit bytes into the ring at idx.
- p = min(limit, len(data)-1)
- rng[idx:idx+p] = data[0:p]
- cur = p
- for i in range(1, limit+1):
- matchlen, matchpos = InsertNode(idx - i)
- matchlen, matchpos = InsertNode(idx)
- # Now you're initialized, so do the rest of the file.
- while True:
- mask>>=1
- matchlen = min(matchlen, p)
- if matchlen <= threshold:
- matchlen = 1
- codebuf.append(rng[idx])
- mask ^= noncondition
- else:
- mask ^= condition
- a = (matchpos>>b_shft) & b_mask
- a |= (matchlen - threshold - 1)
- b = matchpos&0xFF
- if byteorder == 'little':
- codebuf.extend((b,a))
- else:
- codebuf.extend((a,b))
- # Flush when the mask is full.
- if mask < 256:
- output.append(mask)
- output.extend(codebuf)
- codebuf = bytearray()
- mask = 0xFF00
- prevmatchlen = matchlen
- j = min(prevmatchlen, len(data)-cur)
- for i in range(j):
- DeleteNode(s)
- rng[s] = data[cur]
- if s < (limit - 1):
- rng[s + ring] = data[cur]
- cur+=1
- # Correct the ring position via modulo ring
- s+=1
- s&= r_mask
- idx+=1
- idx&= r_mask
- matchlen, matchpos = InsertNode(idx)
- # Flush the rest of the buffer if necessary.
- for i in range(j, prevmatchlen):
- DeleteNode(s)
- s+=1
- s&= r_mask
- idx+=1
- idx&= r_mask
- p-=1
- if p:
- matchlen, matchpos = InsertNode(idx)
- # Loop until source empty.
- if not p:
- break
- # Flush remaining output; mask the lead bit off the mask.
- if codebuf:
- ## mask &= ~(1<<mask.bit_length()-1) # bottom to top bitorder.
- l = mask.bit_length() - 8
- mask &= 0xFF
- output.append(mask>>l)
- output.extend(codebuf)
- return output
- def decode(data, out_sz, ring, threshold, idx, fill, byteorder, bitorder='bottom', condition=True, count=-1, relative=False):
- """
- Decompresses [data] originally compressed with encode(),
- returning a bytes object.
- if [out_sz] is None, will decompress to the end of file.
- [ring], [threshold], [idx], [fill], and [byteorder]
- should match the original compression settings.
- [ring] may be any size between 0x4000 and 0x100.
- [threshold] is the shortest acceptable copy, minus one.
- [idx] is typically ring - limit but may differ on application.
- [fill] is used to initialize the ring.
- *) For text applications use a space (b' ', or ASCII code 0x20)
- *) Binary applications use a zero (b'\\0' or 0)
- [byteorder] sets the byteorder copy commands are written in.
- Usually this is 'little', but 'big' is also acceptable.
- [bitorder] indicates if command bits are read from the 'bottom' or 'top'.
- [condition] sets if a literal is written when a flag is True or False.
- [count], when set, terminates decompression after the given number of commands.
- [relative], if True, effectively is ringless.
- """
- output, buffer = bytearray(), bytearray(ring)
- # set up ring buffer
- if fill:
- from itertools import repeat
- if isinstance(fill, str):
- fill = fill.encode()
- if isinstance(fill, (bytes, bytearray)):
- fill = fill[0]
- buffer[0:idx] = repeat(fill, idx)
- bits = 0
- c_mask = 0
- c_refill = 1 if bitorder=='bottom' else 0x80
- d = iter(data)
- r_mask = ring-1
- threshold += 1
- b_shift = r_mask.bit_length() - 8
- s_mask = (0xFF >> b_shift) & 0xFF
- b_mask = s_mask ^ 0xFF
- try:
- while count:
- #if bits<2:
- # bits = next(d) | 0x100
- #if(bits&1):
- if not c_mask:
- bits = next(d)
- c_mask = c_refill
- count -= 1
- if bool(bits&c_mask) == condition:
- v = next(d)
- buffer[idx]=v
- idx=(idx+1)&r_mask
- output.append(v)
- else:
- if byteorder == 'little':
- val = next(d)
- size = next(d)
- else:
- size = next(d)
- val = next(d)
- back = (size & b_mask) << b_shift
- back &= 0xFF00
- back |= val
- # back &= r_mask
- size = (size & s_mask) + threshold
- if relative:
- for x in range(size):
- buffer[idx] = output[-back]
- output.append(output[-back])
- idx=(idx+1)&r_mask
- else:
- for x in range(size):
- buffer[idx] = buffer[back]
- output.append(buffer[back])
- idx=(idx+1)&r_mask
- back = (back+1)&r_mask
- #bits>>=1
- if bitorder == 'bottom':
- c_mask <<= 1
- else:
- c_mask >>= 1
- c_mask &= 0xFF
- if out_sz:
- if out_sz<=len(output):
- break
- except IndexError:
- import warnings
- warnings.warn("\tError! Insufficient data.\n Probably not an LZSS file.\n")
- return bytes()
- except StopIteration as E:
- if out_sz:
- raise E
- return bytes(output)
- def length(data, out_sz, ring, threshold, byteorder, bitorder='bottom', condition=True, count=-1):
- """
- Returns (compressed size, decompressed size).
- if [out_sz] is None, will decompress to the end of file.
- [ring], [threshold], and [byteorder]
- should match the original compression settings.
- [ring] may be any size between 0x4000 and 0x100.
- [threshold] is the shortest acceptable copy, minus one.
- [byteorder] sets the byteorder copy commands are written in.
- Usually this is 'little', but 'big' is also acceptable.
- [bitorder] indicates if command bits are read from the 'bottom' or 'top'.
- [condition] sets if a literal is written when a flag is True or False.
- [count], when set, terminates decompression after the given number of commands.
- """
- sz, out = 0, 0
- bits = 0
- c_mask = 0
- c_refill = 1 if bitorder=='bottom' else 0x80
- r_mask = ring-1
- threshold += 1
- b_shift = r_mask.bit_length() - 8
- s_mask = (0xFF >> b_shift) & 0xFF
- try:
- while count:
- if not c_mask:
- bits = data[sz]
- sz += 1
- c_mask = c_refill
- count -= 1
- if bool(bits&c_mask) == condition:
- v = data[sz]
- sz += 1
- out += 1
- else:
- if byteorder == 'little':
- val = data[sz]
- size = data[sz+1]
- else:
- size = data[sz]
- val = data[sz+1]
- sz += 2
- out += (size & s_mask) + threshold
- if bitorder == 'bottom':
- c_mask <<= 1
- else:
- c_mask >>= 1
- c_mask &= 0xFF
- if out_sz:
- if out_sz<=out:
- break
- except Exception as E:
- print(E)
- # Still return progress to this point.
- return (sz, out)
- __modes = {'txt':0x20, 'bin':0}
- def encode1KB(data, mode='bin', **kwargs):
- """Compresses [data] using 1KB ring and given fill mode,
- returning a bytes object.
- [mode] may be:
- 'txt': fills buffer with spaces (b' ', or ASCII code 0x20)
- 'bin': fills buffer with zeroes
- same as:
- encode(data, 1024, 66, 2, 958, mode, 'little', False)
- """
- f = __modes.get(mode)
- return encode(data, ring=1024, limit=66, threshold=2, idx=kwargs.pop('idx', 958), fill=f, byteorder=kwargs.pop('byteorder', 'little'), condition=kwargs.pop('condition', False), **kwargs)
- def decode1KB(data, out_sz=None, mode='bin', **kwargs):
- """Decompresses [data] using 1KB ring and given fill [mode],
- returning a bytes object.
- If [out_sz] is not given, decompresses to the end of data.
- [mode] may be:
- 'txt': fills buffer with spaces (b' ', or ASCII code 0x20)
- 'bin': fills buffer with zeroes
- same as:
- decode(data, out_sz, 1024, 2, 958, mode, 'little', 'bottom', False)
- """
- f = __modes.get(mode)
- return decode(data, out_sz, ring=1024, threshold=2, idx=kwargs.pop('idx', 958), fill=f, byteorder=kwargs.pop('byteorder', 'little'), condition=kwargs.pop('condition', False), **kwargs)
- def length1KB(data, out_sz=None, **kwargs):
- """Returns (compressed size, decompressed size) of [data] using 1KB ring,
- basically by faking decompression.
- If [out_sz] is not given, "decompresses" to the end of data.
- same as:
- length(data, out_sz, 1024, 2, 'little', 'bottom', False)
- """
- return length(data, out_sz, ring=1024, threshold=2, byteorder=kwargs.pop('byteorder', 'little'), condition=kwargs.pop('condition', False), **kwargs)
- def main():
- pass
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment