Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- from __future__ import print_function
- import sys
- import os
- import re
- import io
- import base64
- import zlib
- import yaml as y
- from minecraft import SUPPORTED_PROTOCOL_VERSIONS
- from minecraft.networking import types, packets, connection
- from minecraft.networking.packets import clientbound, serverbound
- CLIENT, SERVER = 0, 1
- HANDSHAKE, STATUS, LOGIN, PLAY = 0, 1, 2, 3
- PEER_RE = re.compile(r'peer(\d+)_(\d+)$')
- def parse_file(file):
- events = y.parse(file, y.SafeLoader)
- for etype in y.StreamStartEvent, y.DocumentStartEvent, y.MappingStartEvent:
- event = next(events)
- assert isinstance(event, etype), 'not isinstance(%r, %r)' % (event, etype)
- streams = {CLIENT: io.BytesIO(), SERVER: io.BytesIO()}
- counts = {CLIENT: 0, SERVER: 0}
- context = connection.ConnectionContext(
- protocol_version=max(SUPPORTED_PROTOCOL_VERSIONS))
- compression = False
- state = HANDSHAKE
- packet_map = get_packets(context)
- for event1 in events:
- assert isinstance(event1, y.ScalarEvent), \
- 'not isinstance(%r, y.ScalarEvent)' % event1
- peer, index = map(int, PEER_RE.match(event1.value).groups())
- assert counts[peer] == index, \
- 'counts[%r] == %r != %r' % (peer, counts[peer], index)
- counts[peer] += 1
- event2 = next(events)
- assert isinstance(event2, y.ScalarEvent), \
- 'not isinstance(%r, y.ScalarEvent)' % event2
- assert event2.tag == 'tag:yaml.org,2002:binary'
- streams[peer].seek(len(streams[peer].getbuffer()))
- streams[peer].write(base64.b64decode(event2.value))
- streams[peer].seek(0)
- try:
- while True:
- for packet in read_packet(
- stream = streams[peer],
- compression = compression,
- packet_map = packet_map[peer][state],
- context = context
- ):
- if packet is None: break
- streams[peer] = io.BytesIO(streams[peer].read())
- if isinstance(packet, serverbound.handshake.HandShakePacket):
- assert state == HANDSHAKE
- context.protocol_version = packet.protocol_version
- packet_map = get_packets(context)
- state = packet.next_state
- elif isinstance(packet, clientbound.login.LoginSuccessPacket):
- assert state == LOGIN
- state = PLAY
- elif packet.packet_name == 'set compression':
- assert state in (LOGIN, PLAY)
- compression = packet.threshold > 0
- print('%s %s' % ('<--' if peer == CLIENT else '-->', packet))
- if packet is None: break
- except:
- print('<--' if peer == CLIENT else '-->', end=' ***Exception:\n')
- raise
- unused = { p: s.read() for (p, s) in stream }
- if any(len(s) > 0 for s in unused.values()):
- print('Unused data at end of stream: %r' % unused)
- def read_packet(stream, compression, packet_map, context):
- try:
- for buffer in read_packet_buffer(stream, compression):
- if buffer is not None: break
- yield None
- except zlib.error as e:
- print('*** Exception: %s' % e)
- packet = packets.Packet(context)
- else:
- packet_id = next(read_varint(buffer))
- assert packet_id is not None
- if packet_id in packet_map:
- packet = packet_map[packet_id](context)
- packet.read(buffer)
- else:
- packet = packets.Packet(context, id=packet_id)
- yield packet
- def read_packet_buffer(stream, compression):
- for length in read_varint(stream):
- if length is not None: break
- yield None
- buffer = packets.PacketBuffer()
- while True:
- buffer.send(stream.read(length - len(buffer.get_writable())))
- if len(buffer.get_writable()) >= length: break
- yield None
- assert len(buffer.get_writable()) == length
- buffer.reset_cursor()
- if compression:
- data_length = next(read_varint(buffer))
- assert data_length is not None
- if data_length > 0:
- data = zlib.decompress(buffer.read())
- assert len(data) == data_length
- buffer.reset()
- buffer.send(data)
- buffer.reset_cursor()
- yield buffer
- def read_varint(stream):
- value = 0
- for n in range(5):
- while True:
- byte = stream.read(1)
- if len(byte) > 0: break
- yield None
- value |= (byte[0] & 0x7F) << 7*n
- if byte[0] & 0x80 == 0: break
- else:
- raise Exception('VarInt overflow.')
- yield value
- def get_packets(context):
- return {bk: {sk: {p.get_id(context): p for p in sm.get_packets(context)}
- for (sk,sm) in ((HANDSHAKE, bm.handshake),
- (LOGIN, bm.login),
- (STATUS, bm.status),
- (PLAY, bm.play))}
- for (bk,bm) in ((CLIENT,serverbound), (SERVER,clientbound))}
- def main(*args):
- if not args:
- parse_file(sys.stdin)
- for arg in args:
- print('%s:' % arg)
- with open(arg) as file:
- parse_file(file)
- if __name__ == '__main__':
- main(*sys.argv[1:])
Add Comment
Please, Sign In to add comment