Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import socket
- import sys
- import time
- from collections import deque
- from threading import Thread
- import Pyro4
- from Public import Event, ProcessManager
- class Worker:
- SEED = 42
- TIMEOUT_ms = 500 # tempo maximo que o processo ficara aguardando antes de processar o prox evento
- JITTER_ms = 200
- SLEEP_LIMIT = 300
- def __init__(self, id, log, host="localhost"):
- # super().__init__()
- self._run = True
- self.host = host
- self.id = id
- self._process_queue = deque()
- self.log = log
- self.file_name = self.id + ".txt"
- self._file = open(self.file_name, 'r')
- self._eof = False
- def _read_line(self):
- l = self._file.readline()
- if not l:
- self._eof = True
- return
- mode, data = self._parse(l)
- if mode == -1:
- print("Erro na leitura do arquivo", file=sys.stderr)
- else:
- if mode == 's':
- ev = Event(origin=self.id, destination=self._sort_random_process(), mode=mode, data=data)
- else:
- ev = Event(origin=self.id, destination=self.id, mode=mode, data=data)
- self._process_queue.append(ev)
- def _sort_random_process(self):
- # O processo destino para cada evento S devera ser escolhido de forma aleatoria
- l = self.log.get_process_list()
- while True:
- i = random.randint(0, len(l) - 1)
- if l[i] != self.id:
- return l[i]
- def _parse(self, line):
- # analisa as linhas do arquivo para evitar que algo errado tenha sido digitado
- line = line.lower().split()
- if line[0] == "s" or line[0] == "l":
- return line[0], ' '.join(line[1:])
- return -1, -1 # caso de excecao
- def register(self):
- """Inicia a thread para efetuar o registro """
- Thread(target=self._register, daemon=False).start()
- def _register(self):
- port = self.log.register_process(self.id)
- Pyro4.Daemon.serveSimple({self: self.id}, ns=True, port=port)
- def _wait(self, jitter=False):
- if jitter:
- time.sleep(random.randint(0, Worker.JITTER_ms) / 100)
- else:
- time.sleep(random.randint(0, Worker.SLEEP_LIMIT) / 100)
- @Pyro4.expose
- def add_event(self, ev):
- """
- adiciona um evento a lista de eventos
- :param ev: Deve ser um dicionario (converter evento para dicionario)
- """
- ev = Event.dict_to_event(ev)
- self._process_queue.append(ev)
- def _report_event(self, ev):
- self.log.report_event(ev.event_to_dict())
- def processarEventosDaFila(self):
- try:
- ev = self._process_queue.popleft()
- if ev.mode == "l":
- print("\nprocessando um evento local", file=sys.stderr)
- print("dados do evento: ", ev.to_string(), file=sys.stderr)
- return ev
- elif ev.mode == "s" and not ev.is_local(self.id):
- # pr_addr = self.log.get_process_address(ev.destination)
- nameservice = Pyro4.locateNS(host=self.host)
- pr_addr = nameservice.lookup(ev.destination)
- shared_obj = Pyro4.Proxy(pr_addr)
- shared_obj.add_event(ev.event_to_dict())
- else:
- print("\nProcessando um evento REMOTO", file=sys.stderr)
- print(ev.to_string(), file=sys.stderr)
- return ev
- except:
- return False
- def _get_ip(self):
- with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
- s.connect(("8.8.8.8", 80))
- return s.getsockname()[0]
- def run(self):
- while not self.log.is_synchronised(): pass
- while self._run:
- self._wait(False)
- if not self._eof: self._read_line()
- ev = self.processarEventosDaFila()
- self._wait(True)
- if ev: self._report_event(ev)
- def start(self):
- Thread(target=self.run, daemon=False, name=self.id).start()
- if __name__ == '__main__':
- id = sys.argv[1]
- ip = sys.argv[2]
- # uri = ProcessManager.get_remote_object_uri(name=ProcessManager.LOG)
- nameservice = Pyro4.locateNS(host=ip)
- uri = nameservice.lookup(ProcessManager.LOG)
- Log = Pyro4.Proxy(uri)
- worker = Worker(id, Log, host=ip)
- worker.register()
- worker.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement