Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- from eventemitter import EventEmitter
- from google.protobuf.internal.containers import RepeatedScalarFieldContainer
- from dota2.enums import EGCBaseClientMsg, ESOMsg, ESOType
- from dota2.protobufs import base_gcmessages_pb2 as _gc_base
- from dota2.protobufs import dota_gcmessages_client_pb2 as _gc_client
- from dota2.protobufs import dota_gcmessages_common_pb2 as _gc_common
- from dota2.protobufs import dota_gcmessages_common_match_management_pb2 as \
- _gc_common_match_management
- class ProtoNotFound(BaseException):
- """Trigger this when cache proto does not exist.
- """
- pass
- def find_so_proto(type_id):
- """Resolves proto message for given type_id
- :param type_id: SO type
- :type type_id: :class:`dota2.enums.ESOType`
- :returns: proto message or `None`
- """
- if not isinstance(type_id, ESOType):
- return None
- proto = getattr(_gc_base, type_id.name, None)
- if proto is None:
- proto = getattr(_gc_client, type_id.name, None)
- if proto is None:
- proto = getattr(_gc_common, type_id.name, None)
- if proto is None:
- proto = getattr(_gc_common_match_management, type_id.name, None)
- if not proto:
- raise ProtoNotFound("Unable to locate proto for: %s" % repr(type_id))
- return proto
- class SOBase(object):
- def __init__(self):
- super(SOBase, self).__init__()
- #: Shared Object Caches
- self.socache = SOCache(self)
- class SOCache(EventEmitter, dict):
- version = None #: cache version
- owner_soid = None #: type and steamid, no clue what type represents
- def __init__(self, dota_client):
- self._dota = dota_client
- self._LOG = logging.getLogger("SOCache")
- # register our handlers
- dota_client.on(ESOMsg.CacheSubscribed,
- self._handle_cache_subscribed)
- dota_client.on(ESOMsg.UpdateMultiple,
- self._handle_update_multiple)
- dota_client.on(ESOMsg.CacheUnsubscribed,
- self._handle_cache_unsubscribed)
- dota_client.on(ESOMsg.Destroy,
- self._handle_cache_destroy)
- dota_client.on(ESOMsg.Create,
- self._handle_cache_create)
- dota_client.on(EGCBaseClientMsg.EMsgGCClientWelcome,
- self._handle_client_welcome)
- def __hash__(self):
- # pretend that we are a hashable dict, lol
- # don't attach more than one SOCache per DotaClient
- return hash((self._dota, 42))
- def emit(self, event, *args):
- if event is not None:
- self._LOG.debug("Emit event: %s" % repr(event))
- super(SOCache, self).emit(event, *args)
- def __get_validated_proto(self, type_id):
- try:
- return find_so_proto(type_id)
- except ProtoNotFound as e:
- self._LOG.error(e.message)
- raise
- def __get_validated_type_id(self, type_id):
- try:
- return ESOType(type_id)
- except ValueError:
- self._LOG.error("Unsupported type: %d" % type_id)
- raise
- def _sync_type_id(self, proto, type_id, cache):
- self[type_id] = proto.FromString(cache.object_data)\
- if not isinstance(cache.object_data, RepeatedScalarFieldContainer)\
- else proto.FromString(cache.object_data[0])
- def _update_cache(self, cache):
- try:
- type_id = self.__get_validated_type_id(cache.type_id)
- proto = self.__get_validated_proto(type_id)
- self._sync_type_id(proto, type_id, cache)
- except (ProtoNotFound, ValueError):
- return False
- if type_id == ESOType.CSODOTALobby:
- self._dota.lobby = self[type_id]
- if self._dota.verbose_debug:
- self._LOG.debug(
- "Received lobby snapshot for lobby ID %s." %
- self._dota.lobby.lobby_id)
- self._dota.emit('practive_lobby_update', self._dota.party)
- elif type_id == ESOType.CSODOTALobbyInvite:
- self._dota.lobby_invite = self[type_id]
- if self._dota.verbose_debug:
- self._LOG.debug(
- "Received lobby invite snapshot for group ID %s." %
- self._dota.lobby.party_id)
- self._dota.emit('lobby_invite_update', self._dota.party)
- elif type_id == ESOType.CSODOTAParty:
- self._dota.party = self[type_id]
- if self._dota.verbose_debug:
- self._LOG.debug(
- "Received party snapshot for party ID %s." %
- self._dota.party.party_id)
- self._dota.emit('party_update', self._dota.party)
- elif type_id == ESOType.CSODOTAPartyInvite:
- self._dota.party_invite = self[type_id]
- if self._dota.verbose_debug:
- self._LOG.debug(
- "Received party invite snapshot for group ID %s." %
- self._dota.party_invite.group_id)
- self._dota.emit('party_update_invite', self._dota.party_invite)
- else:
- if self._dota.verbose_debug:
- self._LOG.debug("Unknown cache ID at update: %s" %
- repr(type_id))
- self.emit("cache_updated", type_id)
- def _destroy_cache(self, cache):
- try:
- type_id = self.__get_validated_type_id(cache.type_id)
- proto = self.__get_validated_proto(type_id)
- self._sync_type_id(proto, type_id, cache)
- except (ProtoNotFound, ValueError):
- return False
- if type_id == ESOType.CSODOTAPartyInvite:
- self._dota.party_invite = None
- self._dota.emit('party_invite_cleared')
- elif type_id == ESOType.CSODOTALobbyInvite:
- self._dota.lobby_invite = None
- self._dota.emit('lobby_invite_cleared')
- else:
- if self._dota.verbose_debug:
- self._LOG.debug("Unknown cache ID at destroy: %s" %
- repr(type_id))
- self.emit("cache_destroyed", type_id)
- def _unsubscribe_cache(self, cache):
- try:
- type_id = self.__get_validated_type_id(cache.type_id)
- proto = self.__get_validated_proto(type_id)
- self._sync_type_id(proto, type_id, cache)
- except (ProtoNotFound, ValueError):
- return False
- if self._dota.lobby and self._dota.lobby.id == cache.owner_soid.id:
- self._dota.lobby = None
- self._dota.emit('practice_lobby_cleared')
- elif self._dota.lobby_invite and self._dota.lobby_invite.group_id == cache.owner_soid.id:
- self._dota.lobby_invite = None
- self._dota.emit('lobby_invite_cleared')
- elif self._dota.party and self._dota.party.id == cache.owner_soid.id:
- self._dota.party = None
- self._dota.emit('party_cleared')
- elif self._dota.party_invite and self._dota.party_invite.group_id == cache.owner_soid.id:
- self._dota.party_invite = None
- self._dota.emit('party_invite_cleared')
- else:
- if self._dota.verbose_debug:
- self._LOG.debug("Unknown cache ID at unsubscribe: %s" %
- repr(type_id))
- self.emit("cache_unsubscribed", type_id)
- def _handle_client_welcome(self, message):
- for one in message.outofdate_subscribed_caches:
- self._handle_cache_subscribed(one)
- def _handle_cache_subscribed(self, message):
- for cache in message.objects:
- self._update_cache(cache)
- def _handle_update_multiple(self, message):
- for cache in message.objects_modified:
- self._update_cache(cache)
- def _handle_cache_unsubscribed(self, message):
- self._unsubscribe_cache(message)
- def _handle_cache_destroy(self, message):
- self._destroy_cache(message)
- def _handle_cache_create(self, message):
- self._update_cache(message)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement