SHARE
TWEET

Untitled

a guest Jul 21st, 2019 72 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import time
  2. import os
  3. import signal
  4. import subprocess
  5. import psycopg2
  6.  
  7. from celery import Celery
  8. from celery.task.control import revoke
  9. from kombu import Exchange, Queue
  10. import sqlalchemy as db
  11. from config import Config
  12. from app.models import TaskDetail
  13.  
  14. celery_app = Celery()
  15. celeryconfig = {}
  16. celeryconfig['BROKER_URL'] = 'amqp://'
  17. # celeryconfig['CELERY_RESULT_BACKEND'] = 'redis://localhost'
  18. celeryconfig['CELERY_QUEUES'] = (
  19.     Queue('tasks', Exchange('tasks'), routing_key='tasks',
  20.           queue_arguments={'x-max-priority': 10}),
  21. )
  22. celeryconfig['CELERY_ACKS_LATE'] = True
  23. celeryconfig['CELERYD_PREFETCH_MULTIPLIER'] = 1
  24. celery_app.config_from_object(celeryconfig)
  25.  
  26. # celery worker -c 1 -A tasks -Q tasks --loglevel=info
  27. # ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill
  28. # amqp://jm-user1:sample@localhost/jm-vhost
  29.  
  30. # rabbitmqctl stop
  31. # rabbitmq-server -detached
  32.  
  33. ###
  34.  
  35. # Idea of the sample project is to simulate transcoding of a video into
  36. # different dimensions using celery priority task queues, and actually
  37. # test if it is priority based.
  38.  
  39. # example: transcode_360p.apply_async(queue='tasks', priority=1)
  40.  
  41. ###
  42.  
  43.  
  44. @celery_app.task(bind=True, soft_time_limit=2)
  45. def run_transcode(self, command, task_detail_id):
  46.     try:
  47.         time.sleep(7)
  48.         engine = db.create_engine(Config.SQLALCHEMY_DATABASE_URI)
  49.         connection = engine.connect()
  50.         metadata = db.MetaData()
  51.         task_detail = db.Table('task_detail', metadata, autoload=True, autoload_with=engine)
  52.         query = db.select([task_detail]).where(task_detail.columns.id == task_detail_id)
  53.         result_proxy = connection.execute(query)
  54.         _obj = result_proxy.fetchone()
  55.         # print(_obj, type(_obj))
  56.         if not _obj:
  57.             connection.close()
  58.             raise Exception('TRANSCODE ERROR: Not found task with id: {}'.format(task_detail_id))
  59.        
  60.         proc = subprocess.Popen(
  61.                 command,
  62.                 stdout=subprocess.PIPE,
  63.                 stderr=subprocess.PIPE,
  64.                 shell=True
  65.             )
  66.         query = db.update(task_detail).values(status='RUNNING', process_id=proc.pid)
  67.         query = query.where(task_detail.columns.id == task_detail_id)
  68.         connection.execute(query)
  69.  
  70.         try:
  71.             os.system('echo ' + str(task_detail_id))
  72.            
  73.         except Exception as exc:
  74.             query = db.update(task_detail).values(status='FAILED')
  75.             query = query.where(task_detail.columns.id == task_detail_id)
  76.             connection.execute(query)
  77.             connection.close()
  78.        
  79.         out, err = proc.communicate()
  80.         logs = out
  81.         if err.decode('utf-8'):
  82.             # print(err)
  83.             logs = err
  84.         else:
  85.             # print('success')
  86.             query = db.update(task_detail).values(status='SUCCESS')
  87.             query = query.where(task_detail.columns.id == task_detail_id)
  88.             connection.execute(query)
  89.        
  90.         log_task = logs.replace(b'\n', b'<br />').decode('utf-8')
  91.         query = db.update(task_detail).values(log=log_task)
  92.         query = query.where(task_detail.columns.id == task_detail_id)
  93.         connection.execute(query)
  94.        
  95.        
  96.         if proc.returncode == 1:
  97.             os.kill(proc.pid, signal.SIGTERM)
  98.             raise Exception('TRANSCODE FAILED!')
  99.             # a = '1'
  100.         # print('*'*20)
  101.         # print(self.AsyncResult(self.request.id).state)
  102.         connection.close()
  103.         # time.sleep(7)
  104.     except Exception as exe:
  105.         print(self.request.id)
  106.         print(exe)
  107.         celery_app.control.revoke(self.request.id, terminate=True)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top