Advertisement
Guest User

Untitled

a guest
Jun 16th, 2019
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.35 KB | None | 0 0
  1. dag = DAG('emr_job_flow_automatic_steps_17',
  2. default_args=default_args,
  3. schedule_interval="@daily",
  4. max_active_runs=1,
  5. catchup=True,
  6. )
  7.  
  8.  
  9. upload_to_S3_task = PythonOperator(
  10. task_id='upload_to_S3',
  11. python_callable=upload_file_to_S3,
  12. op_kwargs={
  13. 'filename': '/home/ab/projects/test.py',
  14. 'key': 'test.py',
  15. 'bucket_name': 'dep-buck',
  16. },
  17. dag=dag)
  18.  
  19. cluster_creator = EmrCreateJobFlowOperator(
  20. task_id='create_job_flow2',
  21. job_flow_overrides=JOB_FLOW_OVERRIDES,
  22. aws_conn_id='aws_default',
  23. emr_conn_id='emr_default',
  24. dag=dag
  25. )
  26.  
  27. step_adder = EmrAddStepsOperator(
  28. task_id='add_steps',
  29. job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
  30. aws_conn_id='aws_default',
  31. steps=step,
  32. dag=dag
  33. )
  34. step_checker = EmrStepSensor(
  35. task_id='watch_step',
  36. job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
  37. step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
  38. aws_conn_id='aws_default',
  39. dag=dag
  40. )
  41.  
  42. cluster_remover = EmrTerminateJobFlowOperator(
  43. task_id='remove_cluster',
  44. job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
  45. aws_conn_id='aws_default',
  46. dag=dag
  47. )
  48.  
  49. upload_to_S3_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement