Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- dag = DAG('emr_job_flow_automatic_steps_17',
- default_args=default_args,
- schedule_interval="@daily",
- max_active_runs=1,
- catchup=True,
- )
- upload_to_S3_task = PythonOperator(
- task_id='upload_to_S3',
- python_callable=upload_file_to_S3,
- op_kwargs={
- 'filename': '/home/ab/projects/test.py',
- 'key': 'test.py',
- 'bucket_name': 'dep-buck',
- },
- dag=dag)
- cluster_creator = EmrCreateJobFlowOperator(
- task_id='create_job_flow2',
- job_flow_overrides=JOB_FLOW_OVERRIDES,
- aws_conn_id='aws_default',
- emr_conn_id='emr_default',
- dag=dag
- )
- step_adder = EmrAddStepsOperator(
- task_id='add_steps',
- job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
- aws_conn_id='aws_default',
- steps=step,
- dag=dag
- )
- step_checker = EmrStepSensor(
- task_id='watch_step',
- job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
- step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
- aws_conn_id='aws_default',
- dag=dag
- )
- cluster_remover = EmrTerminateJobFlowOperator(
- task_id='remove_cluster',
- job_flow_id="{{ task_instance.xcom_pull('create_job_flow2', key='return_value') }}",
- aws_conn_id='aws_default',
- dag=dag
- )
- upload_to_S3_task >> cluster_creator >> step_adder >> step_checker >> cluster_remover
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement