Advertisement
Guest User

Untitled

a guest
Feb 11th, 2016
149
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.36 KB | None | 0 0
  1. import logging
  2. import datetime
  3.  
  4. from datetime import datetime, timedelta
  5. from airflow import DAG
  6. from airflow.operators import BashOperator, ExternalTaskSensor
  7. from telemetry_pipeline_utils import *
  8. # constants
  9.  
  10. START = datetime.combine(datetime.today() - timedelta(days=2), datetime.min.time()) + timedelta(hours=10)
  11. DAG_NAME = 'emr_model_building'
  12.  
  13. default_args = {
  14. 'pool': 'emr_model_building',
  15. 'depends_on_past':False,
  16. 'start_date': START,
  17. 'retries': 1,
  18. 'retry_delay': timedelta(seconds=120),
  19. 'email_on_failure': True,
  20. 'email_on_retry': True
  21. }
  22.  
  23. dag = DAG(DAG_NAME, default_args=default_args, schedule_interval='0 1 * * *')
  24.  
  25. launch_emr = """
  26. {% if params.ENV == "PROD" %}
  27. echo "Launching EMR cluster in Prod Env"
  28. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh launch,provision,deploy model_building_prod.conf
  29. {% else %}
  30. echo "Launching EMR cluster in Stage Env"
  31. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh launch,provision,deploy model_building_stage.conf
  32. {% endif %}
  33. """
  34.  
  35. run_sm_and_reputation = """
  36. {% if params.ENV == "PROD" %}
  37. echo "Building sender models in Prod Env"
  38. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh sd model_building_prod.conf
  39. {% else %}
  40. echo "Building sender models in Stage Env"
  41. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh sd model_building_stage.conf
  42. {% endif %}
  43. """
  44.  
  45. run_cdd = """
  46. {% if params.ENV == "PROD" %}
  47. echo "Building CDD in Prod Env"
  48. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh cdd model_building_prod.conf
  49. {% else %}
  50. echo "Building CDD in Stage Env"
  51. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh cdd model_building_stage.conf
  52. {% endif %}
  53. """
  54.  
  55. terminate_cluster = """
  56. {% if params.import_terminate_emr_cluster == true %}
  57. {% if params.ENV == "PROD" %}
  58. echo "Terminating EMR cluster in Prod Env"
  59. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh terminate model_building_prod.conf
  60. {% else %}
  61. echo "Terminating EMR cluster in Stage Env"
  62. source ~/.bash_profile; /home/deploy/automation/roles/cluster/cluster.sh terminate model_building_stage.conf
  63. {% endif %}
  64. {% else %}
  65. echo "NOT terminating EMR cluster"
  66. {% endif %}
  67. """
  68.  
  69. t0 = ExternalTaskSensor(
  70. task_id='wait_for_previous_run',
  71. trigger_rule='one_success',
  72. external_dag_id=DAG_NAME,
  73. external_task_id='terminate_cluster',
  74. allowed_states=['success'],
  75. execution_delta=timedelta(days=1),
  76. dag=dag)
  77.  
  78. t1 = BashOperator(
  79. task_id='launch_emr',
  80. bash_command=launch_emr,
  81. execution_timeout=timedelta(hours=6),
  82. pool='emr_model_building',
  83. params={'ENV': ENV, 'import_terminate_emr_cluster':import_terminate_emr_cluster},
  84. dag=dag)
  85.  
  86. t2 = BashOperator(
  87. task_id='run_sm_and_reputation',
  88. bash_command=run_sm_and_reputation,
  89. execution_timeout=timedelta(hours=3),
  90. pool='emr_model_building',
  91. params={'ENV': ENV},
  92. dag=dag)
  93.  
  94. t3 = BashOperator(
  95. task_id='run_cdd',
  96. bash_command=run_cdd,
  97. execution_timeout=timedelta(hours=3),
  98. pool='emr_model_building',
  99. params={'ENV': ENV},
  100. dag=dag)
  101.  
  102. t4 = BashOperator(
  103. task_id='terminate_cluster',
  104. bash_command=terminate_cluster,
  105. execution_timeout=timedelta(hours=1),
  106. params={'ENV': ENV, 'import_terminate_emr_cluster':import_terminate_emr_cluster},
  107. pool='emr_model_building',
  108. dag=dag)
  109.  
  110. t1.set_upstream(t0)
  111. t2.set_upstream(t1)
  112. t3.set_upstream(t2)
  113. t4.set_upstream(t3)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement