Advertisement
Guest User

Untitled

a guest
Jun 25th, 2019
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.32 KB | None | 0 0
  1. import random
  2. import socket
  3. import sys
  4. import time
  5. from collections import deque
  6. from threading import Thread
  7.  
  8. import Pyro4
  9. from Public import Event, ProcessManager
  10.  
  11.  
  12. class Worker:
  13. SEED = 42
  14. TIMEOUT_ms = 500 # tempo maximo que o processo ficara aguardando antes de processar o prox evento
  15. JITTER_ms = 200
  16. SLEEP_LIMIT = 300
  17.  
  18. def __init__(self, id, log, host="localhost"):
  19. # super().__init__()
  20. self._run = True
  21.  
  22. self.host = host
  23. self.id = id
  24. self._process_queue = deque()
  25. self.log = log
  26.  
  27. self.file_name = self.id + ".txt"
  28. self._file = open(self.file_name, 'r')
  29. self._eof = False
  30.  
  31. def _read_line(self):
  32. l = self._file.readline()
  33. if not l:
  34. self._eof = True
  35. return
  36. mode, data = self._parse(l)
  37. if mode == -1:
  38. print("Erro na leitura do arquivo", file=sys.stderr)
  39. else:
  40. if mode == 's':
  41. ev = Event(origin=self.id, destination=self._sort_random_process(), mode=mode, data=data)
  42. else:
  43. ev = Event(origin=self.id, destination=self.id, mode=mode, data=data)
  44. self._process_queue.append(ev)
  45.  
  46. def _sort_random_process(self):
  47. # O processo destino para cada evento S devera ser escolhido de forma aleatoria
  48. l = self.log.get_process_list()
  49.  
  50. while True:
  51. i = random.randint(0, len(l) - 1)
  52. if l[i] != self.id:
  53. return l[i]
  54.  
  55. def _parse(self, line):
  56. # analisa as linhas do arquivo para evitar que algo errado tenha sido digitado
  57. line = line.lower().split()
  58. if line[0] == "s" or line[0] == "l":
  59. return line[0], ' '.join(line[1:])
  60.  
  61. return -1, -1 # caso de excecao
  62.  
  63. def register(self):
  64. """Inicia a thread para efetuar o registro """
  65. Thread(target=self._register, daemon=False).start()
  66.  
  67. def _register(self):
  68. port = self.log.register_process(self.id)
  69. Pyro4.Daemon.serveSimple({self: self.id}, ns=True, port=port)
  70.  
  71. def _wait(self, jitter=False):
  72. if jitter:
  73. time.sleep(random.randint(0, Worker.JITTER_ms) / 100)
  74. else:
  75. time.sleep(random.randint(0, Worker.SLEEP_LIMIT) / 100)
  76.  
  77. @Pyro4.expose
  78. def add_event(self, ev):
  79. """
  80. adiciona um evento a lista de eventos
  81. :param ev: Deve ser um dicionario (converter evento para dicionario)
  82. """
  83. ev = Event.dict_to_event(ev)
  84. self._process_queue.append(ev)
  85.  
  86. def _report_event(self, ev):
  87. self.log.report_event(ev.event_to_dict())
  88.  
  89. def processarEventosDaFila(self):
  90. try:
  91. ev = self._process_queue.popleft()
  92. if ev.mode == "l":
  93. print("\nprocessando um evento local", file=sys.stderr)
  94. print("dados do evento: ", ev.to_string(), file=sys.stderr)
  95. return ev
  96.  
  97. elif ev.mode == "s" and not ev.is_local(self.id):
  98. # pr_addr = self.log.get_process_address(ev.destination)
  99.  
  100. nameservice = Pyro4.locateNS(host=self.host)
  101. pr_addr = nameservice.lookup(ev.destination)
  102.  
  103. shared_obj = Pyro4.Proxy(pr_addr)
  104. shared_obj.add_event(ev.event_to_dict())
  105. else:
  106. print("\nProcessando um evento REMOTO", file=sys.stderr)
  107. print(ev.to_string(), file=sys.stderr)
  108.  
  109. return ev
  110. except:
  111. return False
  112.  
  113. def _get_ip(self):
  114. with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
  115. s.connect(("8.8.8.8", 80))
  116. return s.getsockname()[0]
  117.  
  118.  
  119.  
  120. def run(self):
  121. while not self.log.is_synchronised(): pass
  122. while self._run:
  123. self._wait(False)
  124. if not self._eof: self._read_line()
  125. ev = self.processarEventosDaFila()
  126. self._wait(True)
  127. if ev: self._report_event(ev)
  128.  
  129. def start(self):
  130. Thread(target=self.run, daemon=False, name=self.id).start()
  131.  
  132.  
  133. if __name__ == '__main__':
  134. id = sys.argv[1]
  135. ip = sys.argv[2]
  136.  
  137. # uri = ProcessManager.get_remote_object_uri(name=ProcessManager.LOG)
  138. nameservice = Pyro4.locateNS(host=ip)
  139. uri = nameservice.lookup(ProcessManager.LOG)
  140. Log = Pyro4.Proxy(uri)
  141.  
  142. worker = Worker(id, Log, host=ip)
  143. worker.register()
  144. worker.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement