SHARE
TWEET

Untitled

a guest Jul 17th, 2017 43 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. from threading import Thread
  2. import json
  3. from subprocess import Popen, PIPE
  4. import sys
  5. import time
  6. import subprocess
  7. import psutil
  8. import pika
  9. import traceback
  10.  
  11. try:
  12.     N_WORKERS = int(sys.argv[1])
  13. except:
  14.     N_WORKERS = 1
  15.  
  16.  
  17. def kill(proc_pid):
  18.     try:
  19.         process = psutil.Process(proc_pid)
  20.         for proc in process.children(recursive=True):
  21.             proc.kill()
  22.         process.kill()
  23.     except:
  24.         print("fail to kill process, maybe it was already dead")
  25.  
  26. class Worker(Thread):
  27.     def __init__(self,task_queue_name, result_queue_name):
  28.         super(Worker, self).__init__()
  29.         self.daemon = True
  30.         self.cancelled = False
  31.         print("worker creation")
  32.         self.result_queue = result_queue_name
  33.         self.task_queue = task_queue_name
  34.         self.proc = None
  35.         self.is_running = False
  36.         self.stdout = []
  37.         print("worker created")
  38.  
  39.     def run(self):
  40.         print("worker running")
  41.         self.is_running = True
  42.         cmd = ("python fib.py {0} {1}".format(self.task_queue, self.result_queue))
  43.         print(cmd)
  44.         proc = Popen(cmd.split(" "), stdout=PIPE, stderr=PIPE, universal_newlines=True)
  45.         self.proc = proc
  46.         print("PID --------------------- ",proc.pid)
  47.         for stdout_line in iter(proc.stdout.readline, ""):
  48.             self.stdout.append(stdout_line)
  49.             print(stdout_line.strip())
  50.             sys.stdout.flush()
  51.             if not self.is_running:
  52.                 break
  53.  
  54.         proc.stdout.close()
  55.         return_code = proc.wait()
  56.         self.is_running = False
  57.         if return_code:
  58.             print(subprocess.CalledProcessError(return_code, cmd))
  59.  
  60.     def stop(self):
  61.         print("Stop worker")
  62.         if self.proc:
  63.             kill(self.proc.pid)
  64.             self.proc = None
  65.  
  66.     def __del__(self):
  67.         print("Bye worker")
  68.         if self.proc:
  69.             self.stop()
  70.  
  71.     def cancel(self):
  72.         self.cancelled = True
  73.  
  74. class Daemon(object):
  75.     def __init__(self):
  76.         credentials = pika.PlainCredentials('roman', 'qwerty')
  77.         connect = False
  78.         while not connect:
  79.             try:
  80.                 print("try to connect...")
  81.                 self.connection = pika.BlockingConnection(pika.ConnectionParameters('termosim.tgtoil.com',4041,'scheduler',credentials))
  82.                 self.channel = self.connection.channel()
  83.                 connect = True
  84.             except:
  85.                 connect = False
  86.                 time.sleep(0.2)
  87.         self.channel.exchange_declare(exchange="daemons",
  88.                                       type="fanout")
  89.  
  90.         self.daemons_queue = self.channel.queue_declare(exclusive=True, auto_delete=True)
  91.  
  92.         self.channel.queue_bind(exchange='daemons',
  93.                            queue=self.daemons_queue.method.queue)
  94.  
  95.         self.msgs = []
  96.         print(self.daemons_queue)
  97.         start = dict(
  98.             action='new_daemon',
  99.             result_queue=self.daemons_queue.method.queue
  100.         )
  101.         self.channel.basic_publish(exchange='',
  102.                                    routing_key='manager',
  103.                                    body=json.dumps(start, sort_keys=True))
  104.  
  105.         self.channel.basic_consume(self.__callback,
  106.                                    queue=self.daemons_queue.method.queue)
  107.         self.workers = dict()
  108.  
  109.         self.channel.start_consuming()
  110.  
  111.     def create_workers(self,result_queue,task_queue):
  112.         self.workers[result_queue] = list()
  113.         for i in range(N_WORKERS):
  114.             w = Worker(task_queue, result_queue)
  115.             w.start()
  116.             self.workers[result_queue].append(w)
  117.  
  118.  
  119.     def __callback(self,ch, method, properties, body):
  120.         print("new message")
  121.         try:
  122.             data = json.loads(body.decode("utf-8"))
  123.             print(data)
  124.             action = data["action"]
  125.             if action == "start":
  126.                 task_queue = data["task_queue"]
  127.                 result_queue = data["result_queue"]
  128.                 create_workers = Thread(target=self.create_workers(result_queue, task_queue))
  129.                 create_workers.daemon = True
  130.                 create_workers.name = result_queue
  131.                 create_workers.start()
  132.                 create_workers._stop()
  133.                 print(task_queue, result_queue)
  134.  
  135.             elif action == "stop":
  136.                 result_queue = data["result_queue"]
  137.                 workers_for_remove = self.workers.pop(result_queue)
  138.                 for worker in workers_for_remove:
  139.                     worker.cancel()
  140.                     worker.stop()
  141.  
  142.             ch.basic_ack(delivery_tag=method.delivery_tag)
  143.         except:
  144.             e = sys.exc_info()[0]
  145.             print(e)
  146.             traceback.print_exc()
  147.  
  148.  
  149.  
  150. if __name__ == "__main__":
  151.  
  152.     sys.stdout.flush()
  153.     daemon = Daemon()
RAW Paste Data
Top