Advertisement
Guest User

Untitled

a guest
Sep 19th, 2019
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.36 KB | None | 0 0
  1. from airflow.exceptions import AirflowException
  2. from airflow import models
  3. from airflow import DAG
  4. from datetime import datetime, timedelta
  5. from airflow.utils.email import send_email
  6. from dateutil.relativedelta import relativedelta
  7. import os
  8.  
  9. schedule_interval_dag = timedelta(days=1)
  10.  
  11. default_dag_args = {
  12. # Setting start date as yesterday starts the DAG immediately when it is
  13. # detected in the Cloud Storage bucket.
  14. # set your start_date : airflow will run previous dags if dags since startdate has not run
  15. 'start_date': '2019-08-30',
  16. 'email_on_failure': True,
  17. 'email_on_retry': True,
  18. 'project_id' : 'your project id',
  19. 'retries': 1,
  20. 'on_failure_callback': notify_email,
  21. 'retry_delay': timedelta(minutes=5),
  22. }
  23.  
  24. with models.DAG(
  25. dag_id='dag name',
  26. # Continue to run DAG once per day
  27. schedule_interval = schedule_interval_dag,
  28. catchup = True,
  29. default_args=default_dag_args) as dag:
  30.  
  31. check_data_source_1 =
  32. ExternalSensor.ExternalTaskSensor(
  33. task_id='check_data_source_1',
  34. external_dag_id='dag of data source 1',
  35. external_task_id= 'last task of the dag',
  36. execution_delta = timedelta(hours=1),
  37. timeout = 300)
  38.  
  39. check_data_source_2 =
  40. ExternalSensor.ExternalTaskSensor(
  41. task_id='check_data_source_2',
  42. external_dag_id='dag of data source 2',
  43. external_task_id= 'last task of the dag',
  44. execution_delta = timedelta(hours=1),
  45. timeout = 300)
  46.  
  47. check_external_data_source_1 =
  48. ExternalSensor.ExternalTaskSensor(
  49. task_id='check_external_data_source_1',
  50. external_dag_id='dag of external data source 1',
  51. external_task_id= 'last task of the dag',
  52. execution_delta = timedelta(hours=1),
  53. timeout = 300)
  54.  
  55. check_external_data_source_2 =
  56. ExternalSensor.ExternalTaskSensor(
  57. task_id='check_external_data_source_2',
  58. external_dag_id='dag of external data source 2',
  59. external_task_id= 'last task of the dag',
  60. execution_delta = timedelta(hours=1),
  61. timeout = 300)
  62.  
  63. transform_table_1 =
  64. # code for transfromation of table 1
  65.  
  66. check_data_source_1
  67. check_data_source_2
  68. check_external_data_source_1
  69. check_external_data_source_2
  70. transform_table_1.set_upstream([check_data_source_1,check_data_source_2,check_external_data_source_1,check_external_data_source_2])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement