Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import collections
- import configparser
- import csv
- import io
- import os.path
- import re
- import select
- import socket
- # MODIFICADO: Cambiado para usar la librería 'packaging' en lugar de la obsoleta 'distutils'
- from packaging.version import Version
- import logging
- # This is the v19 version of the server. It is not compatible with earlier versions of the script
- # MODIFICADO: Adaptado para el sistema de multijugador en el mapa.
- HOST = r"0.0.0.0"
- PORT = 9999
- PBS_DIR = r"./PBS"
- LOG_DIR = r"."
- RULES_DIR = "./OnlinePresets"
- # Aprox. in seconds
- RULES_REFRESH_RATE = 60
- # MODIFICADO: Usando Version de 'packaging'
- GAME_VERSION = Version("1.0.0")
- POKEMON_MAX_NAME_SIZE = 10
- PLAYER_MAX_NAME_SIZE = 10
- MAXIMUM_LEVEL = 100
- IV_STAT_LIMIT = 31
- EV_LIMIT = 510
- EV_STAT_LIMIT = 252
- # Moves that permanently copy other moves
- SKETCH_MOVE_IDS = ["SKETCH"]
- EBDX_INSTALLED = False
- class Server:
- def __init__(self, host, port, pbs_dir, rules_dir):
- self.valid_party = make_party_validator(pbs_dir)
- self.loop_count = 1
- _,self.rules_files = find_changed_files(rules_dir,{})
- self.rules = load_rules_files(rules_dir,self.rules_files)
- self.host = host
- self.port = port
- self.rules_dir = rules_dir
- self.socket = None
- self.clients = {}
- self.handlers = {
- Connecting: self.handle_login_request,
- LoggedIn: self.handle_loggedin,
- }
- self.socket_map = {}
- def run(self):
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as self.socket:
- self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.socket.bind((self.host, self.port))
- logging.info('Started Server on %s:%d', self.host, self.port)
- self.socket.listen()
- while True:
- try:
- self.loop()
- except KeyboardInterrupt:
- logging.info('Stopping Server')
- break
- def broadcast_to_map(self, message, map_id, exclude_socket=None):
- for s, st in self.clients.items():
- if s is not exclude_socket and isinstance(st.state, LoggedIn) and st.state.map_id == map_id:
- #st.send_buffer += message + b"\n"
- st.send_buffer += message.encode('utf-8') + b"\n"
- def loop(self):
- if (self.loop_count % RULES_REFRESH_RATE) == 0:
- reload_rules,rule_files = find_changed_files(self.rules_dir,self.rules_files)
- if reload_rules:
- self.rules_files = rule_files
- self.rules = load_rules_files(self.rules_dir,self.rules_files)
- self.loop_count = 0
- self.loop_count += 1
- reads = list(self.clients)
- reads.append(self.socket)
- writes = [s for s, st in self.clients.items() if st.send_buffer]
- readable, writeable, errors = select.select(reads, writes, reads, 1.0)
- for s in errors:
- if s is self.socket:
- raise Exception("error on listening socket")
- else:
- self.disconnect(s)
- for s in writeable:
- st = self.clients.get(s)
- if not st: continue
- try:
- n = s.send(st.send_buffer)
- except Exception as e:
- self.disconnect(s, e)
- else:
- st.send_buffer = st.send_buffer[n:]
- for s in readable:
- if s is self.socket:
- s, address = self.socket.accept()
- s.setblocking(False)
- st = self.clients[s] = State(address)
- client_id = s.fileno()
- self.socket_map[client_id] = s
- logging.info('%s: connect (assigned id %d)', st, client_id)
- else:
- st = self.clients.get(s)
- if not st: continue
- try:
- recvd = s.recv(4096)
- except ConnectionResetError as e:
- self.disconnect(s)
- else:
- if recvd:
- recv_buffer = st.recv_buffer + recvd
- while True:
- message, _, recv_buffer = recv_buffer.partition(b"\n")
- if not _:
- st.recv_buffer = message
- break
- else:
- try:
- self.handlers[type(st.state)](s, st, message)
- except Exception as e:
- logging.exception('Server Error', exc_info=e)
- self.disconnect(s, "server error")
- else:
- self.disconnect(s, "client disconnected")
- def disconnect(self, s, reason="unknown error"):
- client_id = s.fileno()
- if client_id in self.socket_map:
- del self.socket_map[client_id]
- st = self.clients.pop(s, None)
- if st:
- if isinstance(st.state, LoggedIn) and st.state.map_id != -1:
- writer = RecordWriter()
- writer.str("player_left")
- writer.int(client_id)
- self.broadcast_to_map(writer.line(), st.state.map_id, exclude_socket=s)
- try:
- writer = RecordWriter()
- writer.str("disconnect")
- writer.str(reason)
- writer.send_now(s)
- s.close()
- except Exception:
- pass
- logging.info('%s: disconnected (%s)', st, reason)
- def handle_login_request(self, s, st, message):
- client_id = s.fileno()
- record = RecordParser(message.decode("utf8"))
- if record.str() != "login":
- self.disconnect(s, "bad assert: expected login")
- return
- version = record.str()
- # MODIFICADO: Usando Version de 'packaging' para la comparación
- if not Version(version) >= GAME_VERSION:
- self.disconnect(s, "invalid version")
- return
- name = record.str()
- id_ = record.int()
- ttype = record.str()
- logging.debug('%s: Login attempt from %s (public_id: %d)', st, name, public_id(id_))
- if not self.valid_party(record):
- self.disconnect(s, "invalid party")
- return
- party_data = record.raw_all()
- map_id, x, y, direction = -1, -1, -1, -1
- st.state = LoggedIn(name, id_, ttype, party_data, map_id, x, y, direction)
- writer = RecordWriter()
- writer.str("login_ok")
- writer.int(client_id)
- writer.send(st)
- logging.info('%s: %s logged in successfully', st, name)
- def handle_loggedin(self, s, st, message):
- client_id = s.fileno()
- record = RecordParser(message.decode("utf8"))
- msg_type = record.str()
- if msg_type == "move":
- map_id = record.int()
- x = record.int()
- y = record.int()
- direction = record.int()
- first_move = st.state.map_id == -1
- # MODIFICADO: Usar el método _replace para actualizar el estado inmutable.
- # Esto es compatible con versiones de Python anteriores a 3.8.
- st.state = st.state._replace(
- map_id=map_id,
- x=x,
- y=y,
- direction=direction
- )
- if first_move:
- for other_s, other_st in self.clients.items():
- if s is not other_s and isinstance(other_st.state, LoggedIn) and other_st.state.map_id == map_id:
- writer = RecordWriter()
- writer.str("player_joined")
- writer.int(other_s.fileno())
- writer.int(other_st.state.map_id)
- writer.str(other_st.state.name)
- writer.str(other_st.state.trainertype)
- writer.int(other_st.state.x)
- writer.int(other_st.state.y)
- writer.int(other_st.state.direction)
- writer.raw(other_st.state.party)
- writer.send(st)
- writer = RecordWriter()
- if first_move:
- writer.str("player_joined")
- writer.int(client_id)
- writer.int(map_id)
- writer.str(st.state.name)
- writer.str(st.state.trainertype)
- writer.int(x)
- writer.int(y)
- writer.int(direction)
- writer.raw(st.state.party)
- else:
- writer.str("player_moved")
- writer.int(client_id)
- writer.int(map_id)
- writer.int(x)
- writer.int(y)
- writer.int(direction)
- self.broadcast_to_map(writer.line(), map_id, exclude_socket=s)
- elif msg_type == "request_interaction":
- target_id = record.int()
- target_socket = self.socket_map.get(target_id)
- if target_socket:
- writer = RecordWriter()
- writer.str("interaction_request")
- writer.int(client_id)
- writer.str(st.state.name)
- target_st = self.clients.get(target_socket)
- if target_st:
- writer.send(target_st)
- elif msg_type == "accept_interaction":
- target_id = record.int()
- target_socket = self.socket_map.get(target_id)
- if target_socket:
- writer = RecordWriter()
- writer.str("interaction_accepted")
- writer.int(client_id)
- # AÑADE el nombre aquí:
- writer.str(st.state.name)
- target_st = self.clients.get(target_socket)
- if target_st:
- writer.send(target_st)
- def write_server_rules(self,writer):
- writer.int(len(self.rules))
- for r in self.rules:
- writer.raw(r)
- class State:
- def __init__(self, address):
- self.address = address
- self.state = Connecting()
- self.send_buffer = b""
- self.recv_buffer = b""
- def __str__(self):
- state_name = type(self.state).__name__.lower()
- if isinstance(self.state, LoggedIn):
- return f"{self.address[0]}:{self.address[1]}/{state_name} ({self.state.name})"
- return f"{self.address[0]}:{self.address[1]}/{state_name}"
- Connecting = collections.namedtuple('Connecting', '')
- # MODIFICADO: Eliminado el argumento 'mutable=True' para compatibilidad con Python < 3.8
- LoggedIn = collections.namedtuple('LoggedIn', 'name id trainertype party map_id x y direction')
- class RecordParser:
- def __init__(self, line):
- self.fields = []
- field = ""
- escape = False
- for c in line:
- if c == "," and not escape:
- self.fields.append(field)
- field = ""
- elif c == "\\" and not escape:
- escape = True
- else:
- field += c
- escape = False
- self.fields.append(field)
- self.fields.reverse()
- def bool(self):
- return {'true': True, 'false': False}[self.fields.pop()]
- def bool_or_none(self):
- return {'true': True, 'false': False, '': None}[self.fields.pop()]
- def int(self):
- return int(self.fields.pop())
- def int_or_none(self):
- field = self.fields.pop()
- if not field: return None
- else: return int(field)
- def str(self):
- return self.fields.pop()
- def raw_all(self):
- data = list(reversed(self.fields))
- self.fields = []
- return data
- def public_id(id_):
- return id_ & 0xFFFF
- class RecordWriter:
- def __init__(self):
- self.fields = []
- def line(self):
- return ",".join(RecordWriter.escape(str(f)) for f in self.fields)
- def send_now(self, s):
- line = self.line() + "\n"
- s.sendall(line.encode("utf8"))
- def send(self, st):
- line = self.line() + "\n"
- st.send_buffer += line.encode("utf8")
- @staticmethod
- def escape(f):
- return f.replace("\\", "\\\\").replace(",", "\\,")
- def int(self, i):
- self.fields.append(str(i))
- def str(self, s):
- self.fields.append(s)
- def raw(self, fs):
- self.fields.extend(fs)
- Pokemon = collections.namedtuple('Pokemon', 'genders abilities moves forms')
- class Universe:
- def __contains__(self, item):
- return True
- def make_party_validator(pbs_dir):
- # ... (Esta función no necesita cambios)
- ability_syms = set()
- move_syms = set()
- item_syms = set()
- pokemon_by_name = {}
- with io.open(os.path.join(pbs_dir, r'abilities.txt'), 'r', encoding='utf-8-sig') as abilities_pbs:
- for row in csv.reader(abilities_pbs):
- if len(row) >= 2:
- ability_syms.add(row[1])
- with io.open(os.path.join(pbs_dir, r'moves.txt'), 'r', encoding='utf-8-sig') as moves_pbs:
- for row in csv.reader(moves_pbs):
- if len(row) >= 2:
- move_syms.add(row[1])
- with io.open(os.path.join(pbs_dir, r'items.txt'), 'r', encoding='utf-8-sig') as items_pbs:
- for row in csv.reader(items_pbs):
- if len(row) >= 2:
- item_syms.add(row[1])
- with io.open(os.path.join(pbs_dir, r'pokemon_server.txt'), 'r', encoding='utf-8-sig') as pokemon_pbs:
- pokemon_pbs_ = configparser.ConfigParser()
- pokemon_pbs_.read_file(pokemon_pbs)
- for section in pokemon_pbs_.sections():
- species = pokemon_pbs_[section]
- if 'forms' in species:
- forms = {int(f) for f in species['forms'].split(',') if f}
- else:
- forms = Universe()
- genders = {
- 'AlwaysMale': {0},
- 'AlwaysFemale': {1},
- 'Genderless': {2},
- }.get(species['gender_ratio'], {0, 1})
- ability_names = species['abilities'].split(',')
- abilities = {a for a in ability_names if a}
- moves = {m for m in species['moves'].split(',') if m}
- pokemon_by_name[section] = Pokemon(genders, abilities, moves, forms)
- def validate_party(record):
- logging.debug('--BEGIN PARTY VALIDATION--')
- errors = []
- try:
- for _ in range(record.int()):
- def validate_pokemon():
- species = record.str()
- species_ = pokemon_by_name.get(species)
- if species_ is None:
- logging.debug('invalid species: %s', species)
- errors.append("invalid species")
- logging.debug('Species: %s', species)
- level = record.int()
- if not (1 <= level <= MAXIMUM_LEVEL):
- logging.debug('invalid level: %d', level)
- errors.append("invalid level")
- personal_id = record.int()
- owner_id = record.int()
- if owner_id & ~0xFFFFFFFF:
- logging.debug('invalid owner id: %d', owner_id)
- errors.append("invalid owner id")
- owner_name = record.str()
- if not (len(owner_name) <= PLAYER_MAX_NAME_SIZE):
- logging.debug('invalid owner name: %s', owner_name)
- errors.append("invalid owner name")
- owner_gender = record.int()
- if owner_gender not in {0, 1}:
- logging.debug('invalid owner gender: %d', owner_gender)
- errors.append("invalid owner gender")
- exp = record.int()
- form = record.int()
- if form not in species_.forms:
- logging.debug('invalid form: %d', form)
- errors.append("invalid form")
- item = record.str()
- if item and item not in item_syms:
- logging.debug('invalid item id: %s', item)
- errors.append("invalid item")
- can_use_sketch = not set(SKETCH_MOVE_IDS).isdisjoint(species_.moves)
- for _ in range(record.int()):
- move = record.str()
- if move:
- if can_use_sketch and move not in move_syms:
- logging.debug('invalid move id (Sketched): %s', move)
- elif move not in species_.moves and not can_use_sketch:
- logging.debug('invalid move id: %s', move)
- errors.append("invalid move")
- ppup = record.int()
- if not (0 <= ppup <= 3):
- logging.debug('invalid ppup for move id %s: %d', move, ppup)
- errors.append("invalid ppup")
- for _ in range(record.int()):
- move = record.str()
- if move:
- if can_use_sketch and move not in move_syms:
- logging.debug('invalid first move id (Sketched): %s', move)
- elif move not in species_.moves and not can_use_sketch:
- logging.debug('invalid first move id: %s', move)
- errors.append("invalid first move")
- gender = record.int()
- if gender not in species_.genders:
- logging.debug('invalid gender: %d', gender)
- errors.append("invalid gender")
- shiny = record.bool_or_none()
- ability = record.str()
- if ability and ability not in ability_syms:
- logging.debug('invalid ability: %s', ability)
- errors.append("invalid ability")
- ability_index = record.int_or_none()
- nature_id = record.str()
- nature_stats_id = record.str()
- ev_sum = 0
- for _ in range(6):
- iv = record.int()
- if not (0 <= iv <= IV_STAT_LIMIT):
- logging.debug('invalid IV: %d', iv)
- errors.append("invalid IV")
- ivmaxed = record.bool_or_none()
- ev = record.int()
- if not (0 <= ev <= EV_STAT_LIMIT):
- logging.debug('invalid EV: %d', ev)
- errors.append("invalid EV")
- ev_sum += ev
- if not (0 <= ev_sum <= EV_LIMIT):
- logging.debug('invalid EV sum: %d', ev_sum)
- errors.append("invalid EV sum")
- happiness = record.int()
- if not (0 <= happiness <= 255):
- logging.debug('invalid happiness: %d', happiness)
- errors.append("invalid happiness")
- name = record.str()
- if not (len(name) <= POKEMON_MAX_NAME_SIZE):
- logging.debug('invalid name: %s', name)
- errors.append("invalid name")
- poke_ball = record.str()
- if poke_ball and poke_ball not in item_syms:
- logging.debug('invalid pokeball: %s', poke_ball)
- errors.append("invalid pokeball")
- steps_to_hatch = record.int()
- pokerus = record.int()
- obtain_mode = record.int()
- obtain_map = record.int()
- obtain_text = record.str()
- obtain_level = record.int()
- hatched_map = record.int()
- cool = record.int()
- beauty = record.int()
- cute = record.int()
- smart = record.int()
- tough = record.int()
- sheen = record.int()
- for _ in range(record.int()):
- ribbon = record.str()
- if record.bool():
- m_item = record.str()
- m_msg = record.str()
- m_sender = record.str()
- m_species1 = record.int_or_none()
- if m_species1:
- m_gender1 = record.int()
- m_shiny1 = record.bool()
- m_form1 = record.int()
- m_shadow1 = record.bool()
- m_egg1 = record.bool()
- m_species2 = record.int_or_none()
- if m_species2:
- m_gender2 = record.int()
- m_shiny2 = record.bool()
- m_form2 = record.int()
- m_shadow2 = record.bool()
- m_egg2 = record.bool()
- m_species3 = record.int_or_none()
- if m_species3:
- m_gender3 = record.int()
- m_shiny3 = record.bool()
- m_form3 = record.int()
- m_shadow3 = record.bool()
- m_egg3 = record.bool()
- if record.bool():
- logging.debug('Fused Mon')
- validate_pokemon()
- if EBDX_INSTALLED:
- superhue = record.str()
- if shiny and (superhue == ""):
- logging.debug('uninitialized supershiny')
- errors.append("uninitialized supershiny")
- supervarient = record.bool_or_none()
- logging.debug('-------')
- validate_pokemon()
- rest = record.raw_all()
- if rest:
- errors.append(f"remaining data: {', '.join(rest)}")
- except Exception as e:
- errors.append(str(e))
- if errors: logging.debug('Errors: %s', errors)
- logging.debug('--END PARTY VALIDATION--')
- return not errors
- return validate_party
- def find_changed_files(directory,old_files_hash):
- if os.path.isdir(directory):
- new_files_hash = dict ([(f, os.stat(os.path.join(directory,f)).st_mtime) for f in os.listdir(directory)])
- changed = old_files_hash.keys() != new_files_hash.keys()
- if not changed:
- for k in (old_files_hash.keys() & new_files_hash.keys()):
- if old_files_hash[k] != new_files_hash[k]:
- changed = True
- break
- if changed:
- logging.info('Refreshing Rules due to changes')
- return True,new_files_hash
- return False,old_files_hash
- def load_rules_files(directory,files_hash):
- rules = []
- for f in iter(files_hash):
- rule = []
- with open(os.path.join(directory,f)) as rule_file:
- for num,line in enumerate(rule_file):
- line = line.strip()
- if num == 3:
- rule.extend(line.split(','))
- else:
- rule.append(line)
- rules.append(rule)
- return rules
- if __name__ == "__main__":
- parser = argparse.ArgumentParser()
- parser.add_argument("--host", default=HOST,help='The host IP Address to run this server on. Should be 0.0.0.0 for Google Cloud.')
- parser.add_argument("--port", default=PORT,help='The port the server is listening on.')
- parser.add_argument("--pbs_dir", default=PBS_DIR,help='The path, relative to the working directory, where the PBS files are located.')
- parser.add_argument("--rules_dir", default=RULES_DIR,help='The path, relative to the working directory, where the rules files are located.')
- parser.add_argument("--log", default="INFO",help='The log level of the server. Logging messages lower than the level are not written.')
- args = parser.parse_args()
- loglevel = getattr(logging, args.log.upper())
- if not isinstance(loglevel, int):
- raise ValueError('Invalid log level: %s' % loglevel)
- logging.basicConfig(format='%(asctime)s: %(levelname)s: %(message)s', filename=os.path.join(LOG_DIR,'server.log'), level=loglevel)
- logging.info('---------------')
- Server(args.host, int(args.port), args.pbs_dir,args.rules_dir).run()
- logging.shutdown()
Advertisement
Add Comment
Please, Sign In to add comment