Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import itertools
- import json
- import lzma
- import zlib
- import re
- import struct
- import sys
- import os
- from collections import Counter
- # from savegame import TTDSavegameDecoder
- SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
- def hex_str(s):
- if isinstance(s, (bytes, memoryview)):
- return ':'.join('{:02x}'.format(b) for b in s)
- return ':'.join('{:02x}'.format(ord(c)) for c in s)
- def fmt_str(s):
- if not s:
- return ''
- try:
- return '%s (%s)' % (s.decode('utf-8'), hex_str(s))
- except Exception as e:
- return hex_str(s)
- def ttd_unpack(fmt, buf, offset=0):
- # print(fmt, len(buf[offset:]), hex_str(buf[offset:]))
- assert isinstance(fmt, str), fmt
- if fmt == '&':
- return [buf[offset:-1]]
- while True:
- pos = fmt.find('z')
- if pos < 0:
- break
- asciiz_start = struct.calcsize(fmt[:pos]) + offset
- asciiz_len, len_size = TTDSavegameDecoder.read_gamma(buf[asciiz_start:])
- fmt = '%s%s%ds%s' % (fmt[:pos], 'x' * len_size, asciiz_len, fmt[pos + 1:])
- while True:
- pos = fmt.find('l')
- if pos < 0:
- break
- list_start = struct.calcsize(fmt[:pos]) + offset
- # print('l', fmt, list_start, buf[list_start:].tobytes())
- list_len = struct.unpack_from('>I', buf, offset=list_start)[0]
- fmt = '%sxxxx%ds%s' % (fmt[:pos], list_len * 4, fmt[pos + 1:])
- # print(fmt, len(buf[offset:]), hex_str(buf[offset:]))
- res = struct.unpack_from(fmt, buf, offset)
- # return [x.decode() + '|' + hex_str(x) if isinstance(x, bytes) else x for x in res], fmt
- return [fmt_str(x) if isinstance(x, bytes) else x for x in res], fmt
- # return res, fmt
- class TTDSavegameDecoder:
- def __init__(self):
- pass
- @staticmethod
- def _parse_label(data):
- return ''.join(map(chr, data[:4]))
- @staticmethod
- def read_gamma(data):
- if (data[0] & 0xF0) == 0xE0:
- return (data[0] & 0x0F) << 24 | data[1] << 16 | data[2] << 8 | data[3], 4
- elif (data[0] & 0xE0) == 0xC0:
- return (data[0] & 0x1F) << 16 | data[1] << 8 | data[2], 3
- elif (data[0] & 0xC0) == 0x80:
- return (data[0] & 0x3F) << 8 | data[1], 2
- elif (data[0] & 0x80) == 0:
- return data[0] & 0x7F, 1
- raise ValueError('Invalid gamma')
- def _read_int(self, data, size, is_signed):
- fmt = {8: '>b', 16: '>h', 32: '>i'}[size]
- if not is_signed:
- fmt = fmt.upper()
- return struct.unpack_from(fmt, data)[0], size // 8
- def _decode_data(self, data, version):
- d = memoryview(data)
- # print('Savegame version', version, 'uncompressed size', len(d))
- res = {}
- while d:
- chunk_id = self._parse_label(d[:4])
- if chunk_id == '\0\0\0\0':
- break
- assert chunk_id not in res, f"Duplicate chunk {chunk_id}"
- # print('Chunk', chunk_id, end=' ')
- block_mode = d[4]
- d = d[5:]
- # CH_RIFF = 0,
- # CH_ARRAY = 1,
- # CH_SPARSE_ARRAY = 2,
- # CH_TYPE_MASK = 3,
- # CH_LAST = 8, ///< Last chunk in this array.
- # CH_AUTO_LENGTH = 16,
- if block_mode & 0xF == 0: # RIFF
- size = d[0] << 16 | d[1] << 8 | d[2] | (block_mode >> 4) << 24;
- res[chunk_id] = d[3:3 + size]
- d = d[3 + size:]
- elif block_mode == 1 or block_mode == 2:
- items = []
- indexes = []
- index = 0
- d0 = d
- dlen = 0
- while True:
- item_size, l = self.read_gamma(d)
- d = d[l:]
- dlen += l
- if not item_size:
- break
- item_size -= 1
- l = 0;
- if block_mode == 2:
- index, l = self.read_gamma(d)
- if item_size > 0:
- items.append(d[l:item_size])
- indexes.append(index)
- index += 1
- d = d[item_size:]
- dlen += item_size
- res[chunk_id] = d0[:dlen]
- else:
- raise ValueError(f'Unknown chunk mode {block_mode}')
- return res
- def decode(self, data):
- tag = self._parse_label(data)
- decompressor = {
- 'OTTX': lzma.LZMADecompressor().decompress,
- 'OTTZ': zlib.decompress,
- }.get(tag)
- if not decompressor:
- raise ValueError(f'Unknown savegame format {tag}')
- major_version = data[4] * 8 + data[5]
- minor_version = data[6] * 8 + data[7]
- print(f'Version: {major_version} {minor_version}')
- res = {}
- res['chunks'] = self._decode_data(decompressor(data[8:]), major_version)
- res['version'] = major_version
- return res
- d = TTDSavegameDecoder()
- s1 = d.decode(open(sys.argv[1], 'rb').read())
- print(f'Version: {s1["version"]}')
- for chunk_id, data in s1['chunks'].items():
- print(chunk_id, hex_str(data[:100]))
- l = len(data)
- c = lzma.LZMACompressor(preset=2, check=lzma.CHECK_CRC32)
- res = c.compress(data)
- res += c.flush()
- cl = len(res)
- ratio = 100 * cl / l
- print(f'{chunk_id} uncompressed: {l} compressed: {cl} ratio: {ratio:.1f}%')
- print()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement