Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # Requires Python 3.9+ (due to use of memoryview.po
- #
- # Query Valve’s Master Server and then send A2S_INFO to each server to get name/map/player count.
- # Works on Chivalry: Medieval Warfare (AppID 219640) or any Source‑engine title.
- import socket
- import struct
- import sys
- import time
- import tkinter as tk
- from tkinter import ttk
- import threading
- ### CONFIGURATION ###
- APP_ID = 219640 # Chivalry: Medieval Warfare 🛡️
- MASTER_HOST = "hl2master.steampowered.com"
- MASTER_PORT = 27011
- REGION = 0xFF # 0xFF = "rest of world"; see Valve docs turn3view0 §1.1.1
- FILTER = f"\\appid\\{APP_ID}\\password\\0" # Only no‑password servers for this game
- MAX_HOSTS = 30 # Don't hammer for way too many servers
- ### PROTOCOL CONSTANTS ###
- TIMOUT_SEC = 3.0
- MASTER_PING = b"\x31"
- END_MARKER = b"\x00"
- A2S_HEADER = b"\xFF\xFF\xFF\xFF"
- A2S_INFO = b"TSource Engine Query\x00"
- A2S_PLAYER = b"U"
- A2S_RULES = b"V"
- def format_time(duration_sec: float) -> str:
- """
- Convert a duration to [H:]M:S with no zero-padding and omit hours when
- it's zero.
- - Any fractional seconds get rounded to nearest second.
- - If hours == 0, result is "M:S" (e.g. "5:7" for 5m 7s).
- If hours > 0, result is "H:M:S" (e.g. "1:2:3").
- - Seconds and minutes are not padded: "0:5", not "0:05".
- """
- total = int(max(0, round(duration_sec)))
- hours, rem = divmod(total, 3600)
- minutes, seconds = divmod(rem, 60)
- if hours:
- return f"{hours}:{minutes}:{seconds}"
- return f"{minutes}:{seconds}"
- ### MASTER SERVER QUERY FUNCTION ###
- def query_master(appid: int, region: int, filter_str: str, max_hosts: int = 30, timeout: float = TIMOUT_SEC):
- """
- Generator that yields (ip, port) for each server returned by Master Server.
- Implements Valve Master Server Query Protocol entirely with python sockets.
- See Valve docs: turn3view0 Master Server Protocol §1.1 and §1.4.
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.settimeout(timeout)
- master_addr = (MASTER_HOST, MASTER_PORT)
- last = "0.0.0.0:0"
- sent = 0
- received = 0
- hosts = 0
- while hosts < max_hosts:
- payload = MASTER_PING
- payload += struct.pack("B", region & 0xFF)
- payload += last.encode("ascii") + END_MARKER
- payload += filter_str.encode("ascii") + END_MARKER
- try:
- s.sendto(payload, master_addr)
- except Exception as e:
- print(f"[master] send failed: {e}", file=sys.stderr)
- break
- sent += 1
- try:
- data, _ = s.recvfrom(4096)
- except socket.timeout:
- print("[master] Timeout or blocked by DS; stopping", file=sys.stderr)
- break
- # Validate header: starts with 0xFF 0xFF 0xFF 0xFF 0x66 0x0A
- if len(data) < 6 or data[:2] != b"\xFF\xFF" or data[4:6] != b"\x66\x0A":
- print("[master] Unexpected reply header:", data[:6].hex(), file=sys.stderr)
- break
- received += 1
- # Each 6‑byte block thereafter: 4 bytes IP, 2 bytes port (network order)
- off = 6
- while off + 6 <= len(data):
- ip_packed = data[off:off+4]
- port_packed = data[off+4:off+6]
- off += 6
- ip = socket.inet_ntoa(ip_packed)
- port = struct.unpack("!H", port_packed)[0]
- if ip == "0.0.0.0" and port == 0:
- return
- yield (ip, port)
- hosts += 1
- if hosts >= max_hosts:
- return
- # Prepare next 'last' IP:Port for subsequent request
- last = f"{ip}:{port}"
- print(f"[master] got {hosts} hosts (sent: {sent}, recv: {received})", file=sys.stderr)
- ### A2S_INFO QUERY FUNCTION ###
- def query_info(addr, timeout=TIMOUT_SEC):
- """
- Sends an A2S_INFO query and returns a dict with server metadata.
- Must add challenge support (post‑June 2020 rule). See turn4view0 §4.1, §4.2.
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.settimeout(timeout)
- s.connect(addr) # watermark for recvfrom origin check
- challenge = -1 # -1 = request challenge
- while True:
- buf = A2S_HEADER + A2S_INFO
- if challenge >= 0:
- #~ buf += struct.pack("<l", challenge & 0xFFFFFFFF)
- buf += struct.pack("<L", challenge)
- try:
- s.send(buf)
- data = s.recv(1400)
- except socket.timeout:
- #~ print(f"[info] {addr} timeout", file=sys.stderr)
- return None
- if len(data) < 5:
- print(f"[info] {addr} short reply", file=sys.stderr)
- return None
- # Check base header
- if data[:4] != A2S_HEADER:
- print(f"[info] {addr} wrong reply header", file=sys.stderr)
- return None
- frame = data[4]
- # frame==0x41 ('A') → challenge; frame==0x49 ('I') → info
- if frame == 0x41: # 'A'
- if len(data) < 9:
- print(f"[info] {addr} broken challenge", file=sys.stderr)
- return None
- challenge = struct.unpack("<L", data[5:9])[0]
- #~ print(f"[info] {addr} gave challenge {challenge}", file=sys.stderr)
- continue
- elif frame == 0x49: # 'I'
- # Parse the rest: name\0 map\0 folder\0 game\0 appid (short), players (byte), max (byte), bots(byte), type, env, visibility, vac, version\0, EDF
- offset = 5
- fields = []
- for _ in range(4): # name, map, folder, game
- end = data.find(b"\x00", offset)
- if end < 0:
- print(f"[info] {addr} unterm’d string field {_}", file=sys.stderr)
- return None
- fields.append(data[offset:end].decode("utf‑8", errors="replace"))
- offset = end + 1
- name, mapname, folder, gamename = fields
- if len(data) < offset + 2 + 1 + 1 + 1 + 1 + 1 + 1:
- print(f"[info] {addr} malformed fixed fields", file=sys.stderr)
- return None
- appid_recv, players, maxp, bots = struct.unpack("<hBBB", data[offset:offset+5])
- offset += 4
- server_type = data[offset:offset+1].decode("ascii")
- env = data[offset+1:offset+2].decode("ascii")
- vis = data[offset+2]
- vac = data[offset+3]
- offset += 4
- # Then version\0
- end = data.find(b"\x00", offset)
- version = data[offset:end].decode("utf‑8", errors="replace") if end > offset else ""
- offset = end + 1
- edf = data[offset]
- offset += 1
- info = {
- "name": name,
- "map": mapname,
- "game": gamename,
- "appid": appid_recv,
- "players": players,
- "max_players": maxp,
- "bots": bots,
- "server_type": server_type,
- "environment": env,
- "password_protected": (vis != 0),
- "vac": bool(vac),
- "version": version,
- "address": addr,
- }
- # Optional EDF fields
- if edf & 0x80:
- port_spectator = struct.unpack("<H", data[offset:offset+2])[0]
- offset += 2
- info["spec_port"] = port_spectator
- if edf & 0x10:
- steamid = struct.unpack("<Q", data[offset:offset+8])[0]
- offset += 8
- info["steamid"] = steamid
- if edf & 0x40:
- name_spec_end = data.find(b"\x00", offset)
- spec_name = data[offset:name_spec_end].decode("utf‑8", errors="replace")
- info["spec_name"] = spec_name
- offset = name_spec_end + 1
- if edf & 0x20:
- keywords_end = data.find(b"\x00", offset)
- keywords = data[offset:keywords_end].decode("utf‑8", errors="replace")
- info["keywords"] = keywords
- offset = keywords_end + 1
- if edf & 0x01:
- gameid = struct.unpack("<Q", data[offset:offset+8])[0]
- info["gameid64"] = gameid
- return info
- else:
- print(f"[info] {addr} got unknown frame 0x{frame:02x}", file=sys.stderr)
- return None
- import socket, struct, sys
- A2S_HEADER = b"\xFF\xFF\xFF\xFF"
- A2S_PLAYER = b"U" # 0x55
- def query_players(addr, timeout=TIMOUT_SEC):
- """
- Perform an A2S_PLAYER query to get name, score, duration for each player.
- Handles Valve's optional challenge (frame 'A' == 0x41), loops until frame 'D'.
- multi-packet/bzip2 not implemented—most Source servers fit in one packet.
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.settimeout(timeout)
- s.connect(addr)
- challenge = -1
- attempts = 0
- while attempts < 3:
- attempts += 1
- buf = A2S_HEADER + A2S_PLAYER
- if challenge >= 0:
- buf += struct.pack("<l", challenge & 0xFFFFFFFF)
- else:
- buf += struct.pack("<l", -1) # request challenge
- try:
- s.send(buf)
- data = s.recv(2048)
- except socket.timeout:
- print(f"[player] timeout on attempt {attempts}", file=sys.stderr)
- return None
- if len(data) < 5 or data[:4] != A2S_HEADER:
- print(f"[player] malformed header", file=sys.stderr); return None
- frame = data[4]
- if frame == 0x41: # 'A' → server issued a challenge
- challenge = struct.unpack("<l", data[5:9])[0]
- #~ print(f"[player] got challenge {challenge}", file=sys.stderr)
- continue
- elif frame == 0x44: # 'D' → player list
- break
- else:
- print(f"[player] unexpected frame: 0x{frame:02x}", file=sys.stderr)
- return None
- if frame != 0x44:
- print("[player] no 'D' frame received", file=sys.stderr)
- return None
- ptr = 5
- player_count = data[ptr]
- ptr += 1
- players = []
- for _ in range(player_count):
- idx = data[ptr]; ptr += 1
- end = data.find(b"\x00", ptr)
- name = data[ptr:end].decode("utf-8", errors="replace")
- ptr = end + 1
- score = struct.unpack("<l", data[ptr:ptr+4])[0]
- ptr += 4
- duration = struct.unpack("<f", data[ptr:ptr+4])[0]
- ptr += 4
- players.append({
- "index": idx,
- "name": name,
- "score": score,
- "duration_sec": duration,
- })
- return players
- def load_players(tree, tree_item_id, server):
- info = query_info(server, timeout=TIMOUT_SEC)
- if info and info['players']:
- name = info['name'].encode("ascii", "ignore").decode("ascii", "ignore")
- tree.item(tree_item_id, text=f"({info['players']}/{info['max_players']}) {name} - {info['map']}")
- #~ print(f"Name: {name}")
- #~ print(f"Address: {ip}:{port}")
- #~ print(f"Info: {info['players']}/{info['max_players']} on {info['map']}")
- #~ print("Score\tTime\tPlayer Name")
- players = query_players(server, timeout=TIMOUT_SEC)
- if not players:
- return
- for p in sorted(players, key=lambda p: p["score"], reverse=True):
- player_name = p['name'].encode("ascii", "ignore").decode("ascii", "ignore")
- tree.insert(tree_item_id, "end", text=player_name)
- tree.item(tree_item_id, open=True)
- # print(f"{p['score']}\t{format_time(p['duration_sec'])}\t{player_name}")
- else:
- # No players
- tree.delete(tree_item_id)
- def load_servers(tree, callback):
- threads = []
- for (ip, port) in query_master(APP_ID, REGION, FILTER, max_hosts=MAX_HOSTS, timeout=TIMOUT_SEC):
- server = (ip, port)
- tree_item_id = tree.insert("", "end", text=f"{ip}:{port}...")
- thread = threading.Thread(target=load_players, args=(tree, tree_item_id, server))
- thread.start()
- threads += [thread]
- # Wait for any remaining threads to end
- for thread in threads:
- if thread.is_alive():
- thread.join()
- callback()
- def refresh(btn, tree, thread):
- if not thread.is_alive():
- for item in tree.get_children():
- tree.delete(item)
- thread = threading.Thread(target=load_servers, args=(tree, lambda: enable_refresh(btn)))
- btn.config(state="disabled")
- thread.start()
- def enable_refresh(btn):
- btn.config(state="normal")
- if __name__ == "__main__":
- root = tk.Tk()
- root.title("Chiv Servers")
- root.geometry("400x500")
- tree = ttk.Treeview(root, show="tree")
- tree.pack(fill="both", expand=True)
- thread = threading.Thread(target=load_servers, args=(tree, lambda: enable_refresh(btn)))
- btn = tk.Button(root, text="Refresh", command=lambda: refresh(btn, tree, thread))
- btn.config(state="disabled")
- btn.pack()
- thread.start()
- root.mainloop()
Advertisement
Add Comment
Please, Sign In to add comment