Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- import socket
- from os import urandom
- from io import BytesIO
- from uuid import uuid4
- from hashlib import sha1
- from os.path import isfile
- from json import load, dump
- from enum import Enum, IntEnum
- from struct import pack, unpack
- from binascii import hexlify as _hexlify
- from Crypto.PublicKey import RSA
- from Crypto.Cipher import AES, PKCS1_v1_5
- import requests
- AUTH_URL = "https://authserver.mojang.com"
- SESSION_URL = "https://sessionserver.mojang.com/session/minecraft"
- def rshift(val, n): return val >> n if val >= 0 else (val + 0x100000000) >> n
- def hexlify(b: (bytes, bytearray)) -> str:
- return str(_hexlify(b), "utf8")
- #doubt this is used more than once
- def bytes_to_num(b: (bytes, bytearray)) -> int:
- return int.from_bytes(b, byteorder='big', signed=True)
- def minecraft_sha1_hash_digest(b: (bytes, bytearray)) -> str:
- return format(bytes_to_num(sha1(b).digest()), 'x')
- class ClientPacket(IntEnum):
- HANDSHAKE = 0x00
- LOGIN = 0x00
- ENCRYPTION_RESPONSE = 0x01
- class ServerPacket(IntEnum):
- ENCRYPTION_REQUEST = 0x01
- class Endian(Enum):
- def __str__(self) -> str:
- return str(self.value)
- LITTLE = "<"
- BIG = ">"
- NETWORK = "!"
- class MinecraftIO(BytesIO):
- """
- If only Python supported do-while loops, this is cancer to write
- """
- endian = Endian.LITTLE
- def __init__(self, *args, **kwargs):
- if "endian" in kwargs and isinstance(kwargs["endian"], Endian):
- self.set_endian(kwargs.pop("endian", Endian.LITTLE))
- else:
- self.set_endian(Endian.LITTLE)
- super(MinecraftIO, self).__init__(*args, **kwargs)
- def set_endian(self, endian: Endian) -> None:
- self.endian = str(endian)
- def get_endian(self) -> Endian:
- return Endian(self.endian)
- #read
- def read_bytes(self, count: int) -> (bytes, bytearray):
- return self.read(count)
- def read_byte(self) -> int:
- return self.read_uint8()
- def read_uint8(self) -> int:
- return self.read(1)[0]
- def read_string(self, encoding: str = "ascii") -> str:
- return str(self.read(self.read_varint()), encoding)
- def read_ushort(self) -> int:
- return self.read_uint16()
- def read_uint16(self) -> int:
- return unpack(self.endian + "H", self.read(2))[0]
- def read_ulong(self) -> int:
- return self.read_uint32()
- def read_uint32(self) -> int:
- return unpack(self.endian + "L", self.read(4))[0]
- def read_ulonglong(self) -> int:
- return self.read_uint64()
- def read_uint64(self) -> int:
- return unpack(self.endian + "Q", self.read(8))[0]
- def read_position(self) -> (tuple, list):
- val = self.read_ulonglong()
- x = val >> 38
- y = (val >> 26) & 0xFFF
- z = val & 0x3FFFFFF
- #fixes for negative numbers on non-java implementations
- if x >= pow(2, 25):
- x -= pow(2, 26)
- if y >= pow(2, 11):
- y -= pow(2, 12)
- if z >= pow(2, 25):
- z -= pow(2, 26)
- return (x, y, z)
- def read_varint(self) -> int:
- number = 0
- bytes_encountered = 0
- while True:
- byte = self.read(1)[0]
- number |= (byte & 0x7F) << 7 * bytes_encountered
- if not byte & 0x80:
- break
- bytes_encountered += 1
- if bytes_encountered > 5:
- raise ValueError("Tried to read too long of a VarInt")
- return number
- #write
- def write_bytes(self, values: (bytes, bytearray)) -> int:
- return self.write(values)
- def write_byte(self, value: int) -> int:
- return self.write_byte(value)
- def write_uint8(self, value: int) -> int:
- return self.write(bytes([value]))
- def write_string(self, s: str, encoding: str = "ascii") -> int:
- c = self.write_varint(len(s))
- c += self.write(bytes(s, encoding))
- return c
- def write_ushort(self, value: int) -> int:
- return self.write_uint16(value)
- def write_uint16(self, value: int) -> int:
- return self.write(pack(self.endian + "H", value))
- def write_ulong(self, value: int) -> int:
- return self.write_uint32(value)
- def write_uint32(self, value: int) -> int:
- return self.write(pack(self.endian + "L", value))
- def write_ulonglong(self, value: int) -> int:
- return self.write_uint64(value)
- def write_uint64(self, value: int) -> int:
- return self.write(pack(self.endian + "Q", value))
- def write_position(self, x: int, y: int, z: int) -> int:
- return self.write_ulonglong(((x & 0x3FFFFFF) << 38) | ((y & 0xFFF) << 26) | (z & 0x3FFFFFF))
- def write_varint(self, value: int) -> int:
- out = bytes()
- while True:
- byte = value & 0x7F
- value >>= 7
- out += pack(self.endian + "B", byte | (0x80 if value > 0 else 0))
- if value == 0:
- break
- return self.write(out)
- #don't know if these will be used
- def read_varlong(self) -> int:
- num_read = 0
- result = 0
- #implementation one
- read = self.read(1)[0]
- value = read & 0b01111111
- result |= value << (7 * num_read)
- num_read += 1
- if num_read > 10:
- raise Exception("VarLong is too big")
- while read & 0b10000000 != 0:
- #implementation two
- read = self.read(1)[0]
- value = read & 0b01111111
- result |= value << (7 * num_read)
- num_read += 1
- if num_read > 10:
- raise Exception("VarLong is too big")
- return result
- def write_varlong(self, value: int) -> None:
- #)implementation one
- temp = value & 0b01111111
- value = rshift(value, 7)
- if value != 0:
- temp |= 0b10000000
- self.write(bytes([temp]))
- while value != 0:
- #implementation two
- temp = value & 0b01111111
- value = rshift(value, 7)
- if value != 0:
- temp |= 0b10000000
- self.write(bytes([temp]))
- #documentation: http://wiki.vg/Authentication
- class Yggdrasil(object):
- username = None
- password = None
- def __init__(self, username: str, password: str) -> None:
- self.username = username
- self.password = password
- def authenticate(self, client_token: str = None) -> dict:
- """
- Authenticate with the server using a given username and password
- :param client_token: Use a specified client token otherwise it will be a random uuid4
- :return: client token, access token, and selected profile
- """
- json_data = {
- "agent": {
- "name": "Minecraft",
- "version": 1
- },
- "username": self.username,
- "password": self.password,
- "clientToken": client_token if client_token is not None else str(uuid4())
- }
- headers = {"Content-Type": "application/json"}
- response = requests.post(AUTH_URL + "/authenticate", json=json_data, headers=headers).json()
- if any(key in response for key in ["error", "errorMessage", "cause"]):
- if response["error"] == "ForbiddenOperationException":
- if "cause" in response and response["cause"] == "UserMigratedException":
- raise Exception("Account is migrated use your email instead")
- raise Exception("Login failed: \"" + response["errorMessage"] + "\"")
- return {
- "client_token": response["clientToken"],
- "access_token": response["accessToken"],
- "selected_profile": response["selectedProfile"]
- }
- def validate(self, access_token: str, client_token: str = None) -> bool:
- """
- Validates a given access token and client token pair
- :param access_token: The access token to verify
- :param client_token: The client token tied to the access token
- :return: true if valid otherwise false
- """
- json_data = {
- "accessToken": access_token
- }
- if isinstance(client_token, str):
- json_data["clientToken"] = client_token
- headers = {"Content-Type": "application/json"}
- response = requests.post(AUTH_URL + "/validate", json=json_data, headers=headers)
- return response.status_code == 204
- def refresh(self, access_token: str, client_token: str = None) -> dict:
- """
- Refreshes an access token and client token pair
- :param access_token: The access token to refresh
- :param client_token: The client token tied to the access token
- :return: client token, access token, and selected profile
- """
- json_data = {
- "accessToken": access_token
- }
- if isinstance(client_token, str):
- json_data["clientToken"] = client_token
- headers = {"Content-Type": "application/json"}
- response = requests.post(AUTH_URL + "/refresh", json=json_data, headers=headers).json()
- return {
- "client_token": response["clientToken"],
- "access_token": response["accessToken"],
- "selected_profile": response["selectedProfile"]
- }
- def signout(self) -> bool:
- """
- Signs out and invalidates any current sessions
- :return: true if successful otherwise false
- """
- json_data = {
- "username": self.username,
- "password": self.password
- }
- headers = {"Content-Type": "application/json"}
- response = requests.post(AUTH_URL + "/signout", json=json_data, headers=headers)
- return response.status_code == 200 and response.text == ""
- def invalidate(self, access_token: str, client_token: str = None) -> bool:
- """
- Invalidates a specific access token and client token pair
- :param access_token: The access token to invalidate
- :param client_token: The client token tied to the access token
- :return: true if successful otherwise false
- """
- json_data = {
- "accessToken": access_token
- }
- if isinstance(client_token, str):
- json_data["clientToken"] = client_token
- headers = {"Content-Type": "application/json"}
- response = requests.post(AUTH_URL + "/invalidate", json=json_data, headers=headers)
- return response.status_code == 200 and response.text == ""
- if __name__ == "__main__":
- username = "" #email if migrated
- password = ""
- session_file = "session.json"
- """
- ygg = Yggdrasil(username, password)
- if not isfile(session_file):
- login_data = ygg.authenticate()
- dump(login_data, open(session_file, "w"))
- print("Authenticated with Mojang!")
- else:
- login_data = load(open(session_file, "r"))
- if ygg.validate(login_data["access_token"], login_data["client_token"]):
- print("Session is still valid!")
- else:
- print("Session is invalid, refreshing...")
- login_data = ygg.refresh(login_data["access_token"], login_data["client_token"])
- dump(login_data, open(session_file, "w"))
- print("Session refreshed!")
- print("%s is logged in!" % (login_data["selected_profile"]["name"]))
- """
- #socket
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect(("mineheroes.net", 25565))
- #handshake
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(340) #protocol version
- mc_io.write_string("mineheroes.net") #server hostname
- mc_io.write_ushort(25565) #server port
- mc_io.write_varint(2) #next state
- payload = mc_io.getvalue()
- #header
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(int(ClientPacket.HANDSHAKE)) #packet ID
- mc_io.write(payload) #data
- payload = mc_io.getvalue()
- #packet length
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(len(payload)) #length
- packet_len = mc_io.getvalue()
- #complete header
- #length + packet ID + data
- packet = packet_len + payload
- sock.send(packet)
- #login start
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_string("Visual_Studio")
- payload = mc_io.getvalue()
- #header
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(int(ClientPacket.LOGIN)) # packet ID
- mc_io.write(payload) # data
- payload = mc_io.getvalue()
- #packet length
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(len(payload)) # length
- packet_len = mc_io.getvalue()
- #complete header
- #length + packet ID + data
- packet = packet_len + payload
- sock.send(packet)
- response = sock.recv(4096)
- mc_io = MinecraftIO(response)
- response = mc_io.read(mc_io.read_varint())
- mc_io = MinecraftIO(response)
- packet_id = mc_io.read_varint()
- server_id = mc_io.read_string()
- public_key = mc_io.read(mc_io.read_varint())
- print("Public Key: " + hexlify(public_key))
- verify_token = mc_io.read(mc_io.read_varint())
- print("Verify Token: " + hexlify(verify_token))
- shared_key = urandom(16)
- print("Shared Key: " + hexlify(shared_key))
- client_hash = minecraft_sha1_hash_digest(bytes(server_id, "ascii") + shared_key + public_key)
- print("Client Hash: " + client_hash)
- rsa = PKCS1_v1_5.new(RSA.import_key(public_key))
- enc_shared_key = rsa.encrypt(shared_key)
- enc_verify_token = rsa.encrypt(verify_token)
- #encryption response
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(len(enc_shared_key))
- mc_io.write(enc_shared_key)
- mc_io.write_varint(len(enc_verify_token))
- mc_io.write(enc_verify_token)
- payload = mc_io.getvalue()
- #header
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(int(ClientPacket.ENCRYPTION_RESPONSE)) #packet ID
- mc_io.write(payload) #data
- payload = mc_io.getvalue()
- # packet length
- mc_io = MinecraftIO(endian=Endian.NETWORK)
- mc_io.write_varint(len(payload)) #length
- packet_len = mc_io.getvalue()
- #complete header
- #length + packet ID + data
- packet = packet_len + payload
- sock.send(packet)
- print(sock.recv(4096))
Advertisement
Add Comment
Please, Sign In to add comment