Guest User

Untitled

a guest
Nov 27th, 2017
346
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.15 KB | None | 0 0
  1. import logging
  2. from airflow import DAG
  3. from datetime import datetime, timedelta
  4. from airflow.operators.dummy_operator import DummyOperator
  5. from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
  6. from airflow.operators.hive_operator import HiveOperator
  7. from airflow.operators.email_operator import EmailOperator
  8. from airflow.operators.sensors import HdfsSensor
  9. from altx_data_pipeline.ingestion.test2 import tasks
  10. from altx_data_pipeline.ingestion.test2 import hql
  11.  
  12.  
  13. logger = logging.getLogger(__name__)
  14.  
  15. DAG_ID = 'my-bigdata-dag'
  16.  
  17. default_args = {
  18. 'owner': 'Mehmet Vergili',
  19. 'start_date': datetime(2017, 11, 20),
  20. 'depends_on_past': False,
  21. 'email': 'mehmet.vergili@gmail.com',
  22. 'email_on_failure': 'mehmet.vergili@gmail.com',
  23. 'email_on_retry': 'mehmet.vergili@gmail.com',
  24. 'retries': 1,
  25. 'retry_delay': timedelta(minutes=5)}
  26.  
  27. dag = DAG(dag_id=DAG_ID,
  28. default_args=default_args,
  29. schedule_interval=timedelta(days=1))
  30.  
  31. source_data_sensor = HdfsSensor(
  32. task_id='source_data_sensor',
  33. filepath='/data/mydata/{{ ds }}/file.csv',
  34. poke_interval=10,
  35. timeout=5,
  36. dag=dag
  37. )
  38.  
  39. create_hive_db = HiveOperator(
  40. task_id='create_hive_db',
  41. hql="DROP DATABASE IF EXISTS {db} CASCADE; CREATE DATABASE {db};".format(db='my_hive_db'),
  42. provide_context=True,
  43. dag=dag
  44. )
  45. create_hive_db.set_upstream(source_data_sensor)
  46.  
  47. hdfs_to_hive_trasfer = HiveOperator(
  48. task_id='hdfs_to_hive_trasfer',
  49. hql=hql.HQL_HDFS_TO_HIVE_TRANSFER.format(table_name='mydata',
  50. tmp_table_name='mydata_tmp',
  51. hdfs_path='/data/mydata/{{ ds }}'),
  52. schema='my_hive_db',
  53. provide_context=True,
  54. dag=dag
  55. )
  56. hdfs_to_hive_trasfer.set_upstream(create_hive_db)
  57.  
  58.  
  59. count_data_rows = BranchPythonOperator(
  60. task_id='count_data_rows',
  61. python_callable=tasks.count_data_rows,
  62. templates_dict={'schema': 'my_hive_db'},
  63. provide_context=True,
  64. dag=dag
  65. )
  66. count_data_rows.set_upstream(hdfs_to_hive_trasfer)
  67.  
  68.  
  69. stop_flow = DummyOperator(
  70. task_id='stop_flow',
  71. dag=dag
  72. )
  73.  
  74. create_source_id = PythonOperator(
  75. task_id='create_source_id',
  76. python_callable=tasks.create_source_id,
  77. templates_dict={'source': 'mydata'},
  78. provide_context=True,
  79. dag=dag
  80. )
  81. create_source_id.set_upstream(source_data_sensor)
  82.  
  83.  
  84. clean_data = HiveOperator(
  85. task_id='clean_data',
  86. hql=hql.HQL_CLEAN_DATA.format(source_id="{{ task_instance.xcom_pull(task_ids='create_source_id') }}",
  87. clean_mydata='clean_mydata', mydata='mydata'),
  88. schema='my_hive_db',
  89. provide_context=True,
  90. dag=dag
  91. )
  92. clean_data.set_upstream(create_source_id)
  93. count_data_rows.set_downstream([stop_flow, clean_data])
  94.  
  95.  
  96. move_data_mysql = PythonOperator(
  97. task_id='move_data_mysql',
  98. python_callable=tasks.move_data_mssql,
  99. templates_dict={'schema': 'my_hive_db'},
  100. provide_context=True,
  101. dag=dag
  102. )
  103. move_data_mysql.set_upstream(clean_data)
  104.  
  105. send_email = EmailOperator(
  106. task_id='send_email',
  107. to='mehmet.vergili@gmail.com',
  108. subject='ingestion complete',
  109. html_content="Date: {{ ds }}",
  110. dag=dag)
  111.  
  112. send_email.set_upstream(move_data_mysql)
Add Comment
Please, Sign In to add comment