Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from threading import Thread
- import json
- from subprocess import Popen, PIPE
- import sys
- import time
- import subprocess
- import psutil
- import pika
- import traceback
- try:
- N_WORKERS = int(sys.argv[1])
- except:
- N_WORKERS = 1
- def kill(proc_pid):
- try:
- process = psutil.Process(proc_pid)
- for proc in process.children(recursive=True):
- proc.kill()
- process.kill()
- except:
- print("fail to kill process, maybe it was already dead")
- class Worker(Thread):
- def __init__(self,task_queue_name, result_queue_name):
- super(Worker, self).__init__()
- self.daemon = True
- self.cancelled = False
- print("worker creation")
- self.result_queue = result_queue_name
- self.task_queue = task_queue_name
- self.proc = None
- self.is_running = False
- self.stdout = []
- print("worker created")
- def run(self):
- print("worker running")
- self.is_running = True
- cmd = ("python fib.py {0} {1}".format(self.task_queue, self.result_queue))
- print(cmd)
- proc = Popen(cmd.split(" "), stdout=PIPE, stderr=PIPE, universal_newlines=True)
- self.proc = proc
- print("PID --------------------- ",proc.pid)
- for stdout_line in iter(proc.stdout.readline, ""):
- self.stdout.append(stdout_line)
- print(stdout_line.strip())
- sys.stdout.flush()
- if not self.is_running:
- break
- proc.stdout.close()
- return_code = proc.wait()
- self.is_running = False
- if return_code:
- print(subprocess.CalledProcessError(return_code, cmd))
- def stop(self):
- print("Stop worker")
- if self.proc:
- kill(self.proc.pid)
- self.proc = None
- def __del__(self):
- print("Bye worker")
- if self.proc:
- self.stop()
- def cancel(self):
- self.cancelled = True
- class Daemon(object):
- def __init__(self):
- credentials = pika.PlainCredentials('roman', 'qwerty')
- connect = False
- while not connect:
- try:
- print("try to connect...")
- self.connection = pika.BlockingConnection(pika.ConnectionParameters('termosim.tgtoil.com',4041,'scheduler',credentials))
- self.channel = self.connection.channel()
- connect = True
- except:
- connect = False
- time.sleep(0.2)
- self.channel.exchange_declare(exchange="daemons",
- type="fanout")
- self.daemons_queue = self.channel.queue_declare(exclusive=True, auto_delete=True)
- self.channel.queue_bind(exchange='daemons',
- queue=self.daemons_queue.method.queue)
- self.msgs = []
- print(self.daemons_queue)
- start = dict(
- action='new_daemon',
- result_queue=self.daemons_queue.method.queue
- )
- self.channel.basic_publish(exchange='',
- routing_key='manager',
- body=json.dumps(start, sort_keys=True))
- self.channel.basic_consume(self.__callback,
- queue=self.daemons_queue.method.queue)
- self.workers = dict()
- self.channel.start_consuming()
- def create_workers(self,result_queue,task_queue):
- self.workers[result_queue] = list()
- for i in range(N_WORKERS):
- w = Worker(task_queue, result_queue)
- w.start()
- self.workers[result_queue].append(w)
- def __callback(self,ch, method, properties, body):
- print("new message")
- try:
- data = json.loads(body.decode("utf-8"))
- print(data)
- action = data["action"]
- if action == "start":
- task_queue = data["task_queue"]
- result_queue = data["result_queue"]
- create_workers = Thread(target=self.create_workers(result_queue, task_queue))
- create_workers.daemon = True
- create_workers.name = result_queue
- create_workers.start()
- create_workers._stop()
- print(task_queue, result_queue)
- elif action == "stop":
- result_queue = data["result_queue"]
- workers_for_remove = self.workers.pop(result_queue)
- for worker in workers_for_remove:
- worker.cancel()
- worker.stop()
- ch.basic_ack(delivery_tag=method.delivery_tag)
- except:
- e = sys.exc_info()[0]
- print(e)
- traceback.print_exc()
- if __name__ == "__main__":
- sys.stdout.flush()
- daemon = Daemon()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement