Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/home/winston/venv/ping_servers/bin/python
- # pip3 install nbt mcstatus tabulate
- """
- Ping servers in the servers.dat
- Works on Mac, should work on Windows and Linux.
- """
- from nbt.nbt import NBTFile
- import os
- from mcstatus import MinecraftServer
- from collections import namedtuple
- import platform
- import multiprocessing
- from tabulate import tabulate
- Server = namedtuple('Server', ['name', 'ip', 'port'])
- PingResult = namedtuple('PingResult', ['server', 'status', 'error'])
- # To deal with a Pickler error on the PingResponse object.
- Status = namedtuple('Status', ['players', 'latency', 'motd', 'version'])
- Players = namedtuple('Players', ['online', 'max'])
- MINECRAFT_DIRS = {
- 'Darwin': 'Library/Application Support/minecraft',
- 'Linux': '.minecraft',
- 'Windows': '.minecraft',
- }
- def get_minecraft_dir():
- return os.path.join(
- os.getenv('HOME'),
- MINECRAFT_DIRS[platform.system()]
- )
- def get_servers():
- minecraft_dir = get_minecraft_dir()
- os.chdir(minecraft_dir)
- with open('servers.dat', 'rb') as buf:
- s = NBTFile(buffer=buf)
- for server in s['servers']:
- name = server['name'].valuestr()
- address = server['ip'].valuestr()
- ip, port = get_host_port(address)
- yield Server(name=name, ip=ip, port=port)
- def get_host_port(address, default=25565):
- address = address.strip(' \t')
- if address[0] == '[' or address.count(':') == 1:
- host, _, port = address.rpartition(':')
- return host.strip('[]'), int(port)
- return address, default
- def ping_server(server):
- s = MinecraftServer(server.ip, server.port)
- try:
- res = s.status()
- # avoid pickler error on PingResponse class
- p = Players(res.players.online, res.players.max)
- res = Status(
- players=p,
- latency=res.latency,
- motd=res.description,
- version=res.version.name,
- )
- error = None
- except IOError as ex:
- res = None
- error = ex
- return PingResult(server, res, error)
- def ping_servers(servers):
- for server in servers:
- yield ping_server(server)
- def ping_servers_async(servers):
- pool = multiprocessing.Pool(10)
- yield from pool.map(ping_server, servers)
- def format_address(ip, port, default_port=25565):
- if int(port) == default_port:
- return ip
- L = []
- if ':' in ip: # ipv6
- L.append('[', ip, ']')
- else:
- L.append(ip)
- L.extend([':', str(port)])
- return ''.join(L)
- def format_server(s):
- head = '{0} ({1})'.format(
- s.server.name,
- format_address(s.server.ip, s.server.port),
- )
- if not s.status:
- body = 'is offline.'
- else:
- body = '{v}, has {online}/{cap} players, ~{lag} ms lag.'.format(
- # s.status.motd,
- v=s.status.version,
- online=s.status.players.online,
- cap=s.status.players.max,
- lag=round(s.status.latency),
- )
- return ' '.join([head, body])
- def tabulate_servers(servers):
- L = []
- for s in servers:
- entry = [s.server.name, format_address(s.server.ip, s.server.port)]
- if s.status:
- entry.extend([
- s.status.version,
- s.status.players.online,
- s.status.players.max,
- round(s.status.latency),
- ])
- else:
- entry.extend([None]*4)
- L.append(entry)
- return tabulate(
- L,
- headers=['Name', 'Address', 'Version', 'Online', 'Max', 'Lag (ms)'],
- )
- def main():
- responses = ping_servers_async(get_servers())
- print(tabulate_servers(responses))
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement