Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Source code for nbtparse.minecraft.terrain.region
- from collections import abc as cabc
- import datetime as dt
- import io
- import itertools
- import logging
- import struct
- import time
- import zlib
- from .. import entity_ids
- from . import chunk
- logger = logging.getLogger(__name__)
- OFFSET_LENGTH = 4 # bytes
- TIMESTAMP_LENGTH = 4
- SECTOR_SIZE = 4096 # 4 KiB
- CHUNK_HEADER = 5 # bytes
- GZIP_COMPRESSION = 1
- ZLIB_COMPRESSION = 2
- SIDE_LENGTH = 32 # chunks
- CHUNK_LENGTH = 16 # blocks
- SECTION_HEIGHT = 16
- SIDE_LENGTH_BLOCKS = SIDE_LENGTH * CHUNK_LENGTH
- def _fix_index(index):
- x, z = index
- x = int(x)
- z = int(z)
- x %= SIDE_LENGTH
- z %= SIDE_LENGTH
- return (x, z)
- [docs]class Region(cabc.MutableMapping):
- """A Minecraft region file.
- :obj:`region_x` and :obj:`region_z` should be the region coordinates (as
- they appear in the filename). Standard constructor creates an empty
- region. :meth:`load` loads a region from a file.
- For both loading and saving, files generally need to be seekable. This
- may pose a problem if you want to send a region over the network or
- through a pipe. Depending on your memory and disk resources, you may be
- able to use :class:`io.BytesIO` or :mod:`tempfile` as temporary buffers.
- Contains individual chunks. Access chunk (1, 2), in chunk coordinates,
- as follows::
- r = Region.load(...)
- chunk = r[1, 2]
- No slicing. A :exc:`KeyError` is raised if the chunk does not exist. You
- may use absolute or relative coordinates as you please; they will be
- translated into positive relative coordinates via modulus. You may also
- replace one chunk with another.
- """
- def __init__(self, region_x: int, region_z: int,
- namespace=entity_ids.VANILLA):
- filename = 'r.{}.{}.mca'.format(region_x, region_z)
- self.chunks = {}
- self.timestamps = {}
- self.coords = (region_x, region_z)
- self._namespace = namespace
- @classmethod
- [docs] def load(cls, region_x: int, region_z: int, src: io.BufferedReader,
- namespace=entity_ids.VANILLA):
- """Load a region from disk (or another I/O-like object).
- :obj:`src` must support seeking in both directions.
- """
- if not src.seekable():
- raise ValueError('src must be seekable')
- result = cls(region_x, region_z)
- result._namespace = namespace
- not_present = set()
- offsets = {}
- lengths = {}
- timestamps = {}
- for z, x in itertools.product(range(SIDE_LENGTH), repeat=2):
- logger.debug('Reading offset data for chunk %r, %r', x, z)
- raw = src.read(OFFSET_LENGTH)
- if len(raw) < OFFSET_LENGTH:
- raise RuntimeError('Incomplete region file.')
- high_word, low_byte, length = struct.unpack('>HBB', raw)
- offset = (high_word << 8) | low_byte
- if offset == 0:
- logger.debug('Chunk %r, %r is absent', x, z)
- not_present.add((x, z))
- else:
- offsets[x, z] = offset*SECTOR_SIZE
- lengths[x, z] = length*SECTOR_SIZE
- for z, x in itertools.product(range(SIDE_LENGTH), repeat=2):
- if (x, z) in not_present:
- logger.debug('Skipping timestamp for absent chunk '
- '%r, %r', x, z)
- src.seek(TIMESTAMP_LENGTH, io.SEEK_CUR)
- continue
- logger.debug('Reading timestamp for chunk %r, %r', x, z)
- raw = src.read(TIMESTAMP_LENGTH)
- if len(raw) < TIMESTAMP_LENGTH:
- raise RuntimeError('Incomplete region file.')
- (posix_time,) = struct.unpack('>i', raw)
- timestamp = dt.datetime.fromtimestamp(posix_time,
- dt.timezone.utc)
- logger.debug('Timestamp is %s', timestamp)
- result.timestamps[x, z] = timestamp
- for (x, z), offset in offsets.items():
- result._load_chunk(x, z, src, offset)
- return result
- def _after_load_chunk(self, x: int, z: int) -> None:
- """Hook called right after chunk (x, z) is loaded."""
- self._insert_tiles(x, z)
- def _load_chunk(self, x: int, z: int, src: io.BufferedReader,
- offset: int):
- logger.debug('Loading chunk at %r, %r from %r', x, z, src)
- src.seek(offset)
- header = src.read(CHUNK_HEADER)
- length, compression = struct.unpack('>IB', header)
- compressed = src.read(length - 1)
- logger.debug('Compressed: %r bytes', len(compressed))
- if compression != ZLIB_COMPRESSION:
- raise RuntimeError('Cannot work with compression method {!r}'
- .format(compression))
- if len(compressed) != length - 1:
- raise RuntimeError('Incomplete chunk')
- raw_chunk = zlib.decompress(compressed)
- logger.debug('Uncompressed: %r bytes', len(raw_chunk))
- loaded = chunk.Chunk.from_bytes(raw_chunk, namespace=self._namespace)
- self.chunks[x, z] = loaded
- self._after_load_chunk(x, z)
- def __repr__(self):
- x, z = self.coords
- return '<Region at ({!r}, {!r})>'.format(x, z)
- def _extract_tiles(self, c_x: int, c_z: int):
- """Extract tile entities and prepare to save them to disk."""
- r_x, r_z = self.coords
- chnk = self[c_x, c_z]
- logger.debug('Extracting tile entities from VoxelBuffers in chunk'
- ' (%i, %i)', c_x, c_z)
- # Calculate origin of the chunk
- base_x = r_x*SIDE_LENGTH_BLOCKS + c_x*CHUNK_LENGTH
- base_z = r_z*SIDE_LENGTH_BLOCKS + c_z*CHUNK_LENGTH
- chnk.tiles = []
- for sec_y, section in chnk.sections.items():
- base_y = sec_y*SECTION_HEIGHT
- # Now (base_x, base_y, base_z) is the origin of section.blocks
- tilemap = section.blocks.tilemap
- for (x, y, z), t in tilemap.items():
- block_x = x + base_x
- block_y = y + base_y
- block_z = z + base_z
- t.coords = (block_x, block_y, block_z)
- logger.debug('Extracted tile entity at (%i, %i, %i), '
- 'chunk coordinates (%i, %i, %i), from '
- 'section %i', block_x, block_y, block_z, x,
- y, z, sec_y)
- chnk.tiles.append(t)
- def _remove_empty_sections(self, x: int, z: int) -> None:
- chnk = self[x, z]
- for sec_y, section in list(chnk.items()):
- if section.blocks.empty():
- del chnk[sec_y]
- def _before_save(self, x: int, z: int) -> None:
- """Hook called right before chunk (x, z) is saved."""
- self._remove_empty_sections(x, z)
- self._extract_tiles(x, z)
- def _insert_tiles(self, x, z):
- """Insert tile entities into VoxelBuffers."""
- logger.debug('Inserting tile entities into VoxelBuffers in chunk (%i,'
- ' %i)', x, z)
- result = self[x, z]
- for t in result.tiles:
- block_x, block_y, block_z = t.coords
- chunk_x = block_x % 16
- chunk_y = block_y % 16
- chunk_z = block_z % 16
- section = block_y // 16
- logger.debug('Inserting tile entity at (%i, %i, %i), chunk '
- ' coordinates (%i, %i, %i), into section %i',
- block_x, block_y, block_z, chunk_x, chunk_y, chunk_z,
- section)
- vb = result.sections[section].blocks
- vb.tilemap[chunk_x, chunk_y, chunk_z] = t
- [docs] def clear(self):
- """Mark this region as empty.
- Equivalent to deleting every chunk from the region, but quite a bit
- faster.
- """
- self.chunks.clear()
- self.timestamps.clear()
- [docs] def save(self, dest: io.BufferedWriter):
- """Save the state of this region.
- :obj:`dest` must support seeking in both directions.
- """
- if not dest.seekable():
- raise ValueError('dest must be seekable')
- offset_table = {}
- length_table = {}
- total_offset = 2*SECTOR_SIZE
- # Write the chunks:
- logger.info('Saving chunks')
- for (x, z), chnk in self.items():
- logger.debug('Saving chunk %r, %r at offset %r', x, z,
- total_offset)
- # Loop invariant: total_offset % SECTOR_SIZE == 0
- assert total_offset % SECTOR_SIZE == 0
- dest.seek(total_offset)
- offset_table[x, z] = total_offset // SECTOR_SIZE
- self._before_save(x, z)
- raw_chunk = chnk.to_bytes()
- logger.debug('Uncompressed: %r bytes', len(raw_chunk))
- compressed = zlib.compress(raw_chunk)
- logger.debug('Compressed: %r bytes', len(compressed))
- length = len(compressed) + 1
- # length_table[x, z] = ceil(length / SECTOR_SIZE)
- # But that would create floating-point errors, so:
- length_table[x, z] = (length // SECTOR_SIZE)
- if length % SECTOR_SIZE != 0:
- length_table[x, z] += 1
- header = struct.pack('>IB', length, ZLIB_COMPRESSION)
- total_offset += dest.write(header)
- total_offset += dest.write(compressed)
- if total_offset % SECTOR_SIZE != 0:
- total_offset += SECTOR_SIZE - (total_offset % SECTOR_SIZE)
- dest.seek(0)
- # Now write the header:
- logger.info('Saving header')
- for z, x in itertools.product(range(SIDE_LENGTH), repeat=2):
- if (x, z) not in offset_table:
- logger.debug('Skipping missing chunk %r, %r', x, z)
- dest.write(b'\x00\x00\x00\x00')
- continue
- logger.debug('Writing header for chunk %r, %r', x, z)
- sector_offset = offset_table[x, z]
- sector_length = length_table[x, z]
- high_word = (sector_offset & 0xFFFF00) >> 8
- low_byte = sector_offset & 0xFF
- item = struct.pack('>HBB', high_word, low_byte, sector_length)
- dest.write(item)
- now = int(time.time()) & 0x7FFFFFFF
- # XXX: This will cause a Y2k38 bug, but it's Mojang's fault for
- # using 4-byte timestamps in the first place.
- logger.info('Saving timestamps (as %r)', now)
- for z, x in itertools.product(range(SIDE_LENGTH), repeat=2):
- if (x, z) not in offset_table:
- logger.debug('Skipping missing chunk %r, %r', x, z)
- dest.write(b'\x00\x00\x00\x00')
- continue
- logger.debug('Writing timestamp for chunk %r, %r', x, z)
- item = struct.pack('>i', now)
- dest.write(item)
- def __getitem__(self, index: (int, int)) -> chunk.Chunk:
- x, z = _fix_index(index)
- return self.chunks[x, z]
- def __setitem__(self, index: (int, int), value: chunk.Chunk):
- x, z = _fix_index(index)
- self.chunks[x, z] = value
- def __delitem__(self, index: (int, int)):
- x, z = _fix_index(index)
- del self.chunks[x, z]
- del self.timestamps[x, z]
- def __iter__(self):
- for key in self.chunks.keys():
- yield key
- def __len__(self):
- return len(self.chunks)
- Quick search
- Go
- Enter search terms or a module, class or function name.
- ++++++++++++ CREATED AND CODED BY COLBIY +++++++++++++++++
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement