Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import sys
- import base64
- import json
- import urllib.request
- import io
- import csv
- import time
- import operator
- import os
- import subprocess
- # server example:
- # {"battlEye":true,"firstPersonOnly":false,"shard":"private","timeAcceleration":4,"time":"2019-08-24T10:17:00","mods":[{"name":"Summer_Chernarus","steamWorkshopId":1644467354},{"name":"BuildAnywhere","steamWorkshopId":1574054508},{"name":"SIX-DayZ-Auto-Run","steamWorkshopId":1781132597}],"sponsor":false,"profile":false,"nameOverride":false,"endpoint":{"ip":"74.89.233.235","port":27900},"name":"Dystopia |Event/Dev Server|Discord.gg/kUfqxpX ","map":"chernarusplus","players":0,"maxPlayers":60,"environment":"w","password":true,"version":"1.04.152166","vac":true,"gamePort":2402}
- #=== Configuration ===
- cfg_dzsaServerListUrl = "https://dayzsalauncher.com/api/v1/launcher/servers/dayz"
- cfg_dzsaServerUrl = "https://dayzsalauncher.com/api/v1/query/{ip}/{steamQueryPort}" # eg. 194.26.183.65/27016
- # Alternative servers source: Absolute path to dzsa launcher's dayz-servers.json file
- cfg_dzsaServersFile = "C:/Users/lukas_000/Documents/dzsalauncher/dayz-servers.json"
- # Relative path to servers file (cached results):
- cfg_serversFile = "servers.json"
- cfg_modsFile = "mods.json"
- cfg_cacheExpireSeconds = 60 * 20 # Fetch freshly if local data is outdated (20 min)
- # Relative path to the log file:
- cfg_logFile = "log.txt"
- # Relative path to the filtered and ranked serverlist file:
- cfg_rankedServersFile = "ranked-servers.txt"
- cfg_pingMaxRetries = 1
- #--- Reduce spam in output
- cfg_serverOutputHiddenKeys = ["sponsor", "profile", "nameOverride", "environment"]
- cgf_serverOutputSimpleModlist = True
- cfg_printServersLimit = 20
- #--- Filters ---
- cfg_filter_minMaxPlayers = 60
- cfg_filter_minPlayers = 1
- cfg_filter_serverName = "" # contains, ignore case
- cfg_filter_match = {"battlEye": True, "vac": True, "password": False, "map": "chernarusplus"}
- # each entry is a list of aliases for that mod (eg. list of mods that are equally valid to fulfill that entry)
- cfg_filter_requiredMods = [["DisableBaseDestruction"],["PartyMe"],["Base Furniture Mods"],["VanillaPlusPlusMap"],["Unlimited Stamina","UnlimitedStamina"],["Code Lock"],["Trader"]]
- # Blacklisted mods
- cfg_filter_blacklistedMods = ["C4BaseRaid", "Base-Raiding Breachingcharge", "Breachingcharge", "BaseRaidToolsV2", "BaseRaidTools"]
- # mod name : sorting weight (negative also possible) TODO unused currently
- cfg_filter_optionalMods = {"NoVehicleDamage":1, "DayZ-Auto-Run":1, "SIX-DayZ-Auto-Run":1, "SQUAD MSF-C":1, "Banking":1, "OP_BaseItems":1, "BaseBuildingPlus":1, "BaseBuildingLogs":1, "Server_Information_Panel":1, "MoreGuns":1, "BulletStacksPlusPlus":1, "MasssManyItemOverhaul":1, "Fast Access - Code Lock":1, "BuildAnywhere":1, "GoreZ":1}
- cfg_filter_max_ping = 1000 # in ms TODO unused currently
- #=== End of configuration ===
- # Mirror program output to log file:
- class Logger(object):
- def __init__(self):
- self.terminal = sys.stdout
- self.log = open(cfg_logFile, "w", encoding="utf-8")
- def write(self, message):
- self.terminal.write(message)
- self.log.write(message)
- def flush(self):
- self.terminal.flush();
- self.log.flush()
- # Replace stdout to support unicode strings
- sys.stdout = open(1, 'w', encoding='utf-8', closefd=False); # fd 1 is stdout
- sys.stdout = Logger() # mirror to log file
- def openUrl(method, url, data = None):
- if data:
- dataEnc = data.encode()
- else:
- dataEnc = data
- request = urllib.request.Request(url, dataEnc)
- request.add_header("Content-Type", "application/json")
- request.add_header("Accept", "application/json")
- request.add_header("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Firefox/68.0") # dummy
- request.get_method = lambda: method
- try:
- response = urllib.request.urlopen(request)
- respData = response.read().decode("utf-8")
- except urllib.error.HTTPError as error:
- error_details = error.read();
- error_message = "HTTP ERROR: %s %s" % (error.code, error.reason)
- print("%s" % error_message, flush=True)
- sys.exit(error_message)
- if respData:
- result = json.load(io.StringIO(respData))
- else:
- result = []
- return result
- def fetchDZSAServers():
- print("Fetching serverlist from '" + cfg_dzsaServerListUrl + "' ...", flush=True)
- start = time.perf_counter()
- response = openUrl("GET", cfg_dzsaServerListUrl)
- servers = response["result"]
- duration = time.perf_counter() - start
- print("Fetched " + str(len(servers)) + " servers in %.2f seconds." % duration, flush=True)
- return servers
- def loadJsonFile(filePath):
- start = time.perf_counter()
- with open(filePath, encoding="utf-8") as jsonFile:
- jsonData = json.load(jsonFile);
- duration = time.perf_counter() - start
- return jsonData, duration
- def writeJsonFile(filePath, jsonData, pretty=False):
- start = time.perf_counter()
- with open(filePath, "w", encoding="utf-8") as jsonFile:
- if pretty:
- json.dump(jsonData, jsonFile, sort_keys=True, ensure_ascii=False, indent=4)
- else:
- json.dump(jsonData, jsonFile, sort_keys=True, ensure_ascii=False)
- duration = time.perf_counter() - start
- return duration
- # Loads from dzsa launcher's file
- def loadDZSAServers():
- print("Loading serverlist from file '" + cfg_dzsaServersFile + "' ...", flush=True)
- servers, duration = loadJsonFile(cfg_dzsaServersFile)
- print("Loaded " + str(len(servers)) + " servers in %.2f seconds." % duration, flush=True)
- return servers
- def loadServers():
- print("Loading serverlist from file '" + cfg_serversFile + "' ...", flush=True)
- servers, duration = loadJsonFile(cfg_serversFile)
- print("Loaded " + str(len(servers)) + " servers in %.2f seconds." % duration, flush=True)
- return servers
- def writeServers(servers):
- print("Writing serverlist to file '" + cfg_serversFile + "' ...", flush=True)
- duration = writeJsonFile(cfg_serversFile, servers, pretty=True)
- print("Serverlist written to file in %.2f seconds." % duration, flush=True)
- def loadOrFetchServers():
- # Check local cache:
- if os.path.isfile(cfg_serversFile):
- localServerlistAge = time.time() - os.path.getmtime(cfg_serversFile)
- print("Age of local serverlist: %.2f seconds, Expiration: %.2f seconds" % (localServerlistAge, cfg_cacheExpireSeconds), flush=True)
- if abs(localServerlistAge) < cfg_cacheExpireSeconds:
- return loadServers(), False # still valid
- else:
- print(" Local serverlist is expired.", flush=True)
- # Else: Fetch from DZSA API and save for later re-use
- servers = fetchDZSAServers()
- writeServers(servers)
- return servers, True # freshly fetched
- def writeRankedServers(rankedServers):
- print("Writing ranked serverlist to file '" + cfg_rankedServersFile + "' ...", flush=True)
- duration = writeJsonFile(cfg_rankedServersFile, rankedServers, pretty=True)
- print("Ranked serverlist written to file in %.2f seconds." % duration, flush=True)
- def loadMods():
- print("Loading modlist from file '" + cfg_modsFile + "' ...", flush=True)
- mods, duration = loadJsonFile(cfg_modsFile)
- print("Loaded " + str(len(mods)) + " mods in %.2f seconds." % duration, flush=True)
- return mods
- def writeMods(mods):
- print("Writing mods to file '" + cfg_modsFile + "' ...", flush=True)
- duration = writeJsonFile(cfg_modsFile, mods, pretty=True)
- print("Mods written to file in %.2f seconds." % duration, flush=True)
- def parseMods(servers):
- print("Parsing mods ...", flush=True)
- start = time.perf_counter()
- # Each key (mod name) is mapped to a list of workshop ids (usually of size 1)
- mods = {} # each
- for server in servers:
- serverMods = server["mods"]
- for serverMod in serverMods:
- modName = serverMod["name"]
- workshopId = serverMod["steamWorkshopId"]
- if modName not in mods:
- mods[modName] = [workshopId]
- else:
- workshopIds = mods[modName]
- if workshopId not in workshopIds:
- workshopIds.append(workshopId)
- print("Found different workshop ids for mod: " + modName)
- duration = time.perf_counter() - start
- print("Parsed " + str(len(mods)) + " mods in %.2f seconds." % duration, flush=True)
- return mods
- def loadOrParseMods(servers, forceParse=False):
- # Freshly parse mods if forced (eg. after the servers got refreshed) or if the local modlist is missing
- if forceParse or not os.path.isfile(cfg_modsFile):
- # Parse and save mod list:
- mods = parseMods(servers)
- writeMods(mods)
- return mods, True # freshly parsed
- else:
- return loadMods(), False # Loaded from cache
- # server example:
- # {"battlEye":true,"firstPersonOnly":false,"shard":"private","timeAcceleration":4,"time":"2019-08-24T10:17:00","mods":[{"name":"Summer_Chernarus","steamWorkshopId":1644467354},{"name":"BuildAnywhere","steamWorkshopId":1574054508},{"name":"SIX-DayZ-Auto-Run","steamWorkshopId":1781132597}],"sponsor":false,"profile":false,"nameOverride":false,"endpoint":{"ip":"74.89.233.235","port":27900},"name":"Dystopia |Event/Dev Server|Discord.gg/kUfqxpX ","map":"chernarusplus","players":0,"maxPlayers":60,"environment":"w","password":true,"version":"1.04.152166","vac":true,"gamePort":2402}
- def serverFilter(server):
- # Min max players:
- maxPlayers = server["maxPlayers"]
- if maxPlayers < cfg_filter_minMaxPlayers: return False
- # Min players:
- players = server["players"]
- if players < cfg_filter_minPlayers: return False
- # Server name filter:
- serverName = server["name"]
- if cfg_filter_serverName not in serverName.lower(): return False
- # Data matching:
- for key, value in cfg_filter_match.items():
- if not (server[key] == value): return False
- # Required mods:
- mods = [mod["name"] for mod in server["mods"]]
- for requiredModAliases in cfg_filter_requiredMods:
- found = False
- for requiredMod in requiredModAliases:
- if requiredMod in mods:
- found = True
- break
- if not found: return False # Missing required mod
- # Blacklisted mods:
- for blacklistedMods in cfg_filter_blacklistedMods:
- if blacklistedMods in mods: return False
- return True # Passed all filters
- def filterServers(servers):
- print("Filtering " + str(len(servers)) + " servers ...", flush=True)
- filteredServers = list(filter(serverFilter, servers))
- print("Found " + str(len(filteredServers)) + " matching servers.", flush=True)
- return filteredServers
- def serverRank(server):
- serverValue = 0
- mods = [mod["name"] for mod in server["mods"]]
- for optionalMod, modValue in cfg_filter_optionalMods.items():
- if optionalMod in mods:
- serverValue+=modValue
- return serverValue
- def rankServers(filteredServers):
- print("Ranking servers ...", flush=True)
- rankedServers = [{"server": server, "ranking-value": serverRank(server)} for server in filteredServers]
- rankedServers.sort(key=lambda x: x["ranking-value"], reverse=True)
- print("Servers ranked.", flush=True)
- return rankedServers
- # Note: This is very slow! Limit number of calls
- # Latency parsed from ping command in ms, or 9999 if unreachable
- # TODO inaccurate? Result latency is usually slightly better (~30ms) than what is displayed inside the launcher
- def ping(host):
- return _ping(host)
- def _ping(host, attempt=1):
- # TODO ugly
- output = subprocess.Popen(["ping", "-n", "1", host], stdout=subprocess.PIPE).communicate()[0].decode("utf-8", "ignore") # ignore decoding errors
- try:
- return int(output.split("ms")[0].split("=")[-1])
- except Exception as e:
- if attempt < cfg_pingMaxRetries:
- # retry:
- return _ping(host, attempt=attempt+1)
- else:
- return 9999 # eg. unreachable
- def formatServer(server):
- formattedServer = server.copy() # shallow copy
- # Replace mod entries with simple list of mods names:
- if cgf_serverOutputSimpleModlist:
- formattedServer["mods"] = [mod["name"] for mod in server["mods"]]
- # Removed hidden details:
- for hiddenKey in cfg_serverOutputHiddenKeys:
- del formattedServer[hiddenKey]
- return formattedServer
- def toJsonString(jsonObject, pretty=True):
- if pretty:
- return json.dumps(jsonObject, sort_keys=True, ensure_ascii=False, indent=4)
- else:
- return json.dumps(jsonObject, sort_keys=True, ensure_ascii=False)
- def printRankedServers(rankedServers, number=cfg_printServersLimit):
- for i in range(number):
- if i >= len(rankedServers): break
- rankedServer = rankedServers[i]
- server = rankedServer["server"]
- rankingValue = rankedServer["ranking-value"]
- serverPing = ping(server["endpoint"]["ip"])
- print(str(i + 1) + ") ranking-value=" + str(rankingValue) + " , ping: " + str(serverPing) + " ms")
- print(toJsonString(formatServer(server), pretty=True), flush=True)
- if len(rankedServers) > number:
- print("... (there are more servers fulfilling the filter rules)", flush=True)
- def main():
- servers, serversFetched = loadOrFetchServers()
- mods, modsParsed = loadOrParseMods(servers, forceParse=serversFetched)
- # TODO print info about the configured filters
- filteredServers = filterServers(servers)
- rankedServers = rankServers(filteredServers)
- #writeRankedServers(rankedServers)
- printRankedServers(rankedServers, number=cfg_printServersLimit)
- #latency = pingLatency("195.201.174.52")
- #print(str(latency))
- print("Done.", flush=True)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement