Guest User

Untitled

a guest
Nov 27th, 2017
343
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.12 KB | None | 0 0
  1. import logging
  2. from airflow import DAG
  3. from datetime import datetime, timedelta
  4. from airflow.operators.dummy_operator import DummyOperator
  5. from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
  6. from airflow.operators.hive_operator import HiveOperator
  7. from airflow.operators.email_operator import EmailOperator
  8. from airflow.operators.sensors import HdfsSensor
  9. from your_task_file_path import tasks
  10. from your_hql_file_path import hql
  11.  
  12. logger = logging.getLogger(__name__)
  13.  
  14. DAG_ID = 'my-bigdata-dag'
  15.  
  16. default_args = {
  17. 'owner': 'Mehmet Vergili',
  18. 'start_date': datetime(2017, 11, 20),
  19. 'depends_on_past': False,
  20. 'email': 'mehmet.vergili@gmail.com',
  21. 'email_on_failure': 'mehmet.vergili@gmail.com',
  22. 'email_on_retry': 'mehmet.vergili@gmail.com',
  23. 'retries': 1,
  24. 'retry_delay': timedelta(minutes=5)}
  25.  
  26. dag = DAG(dag_id=DAG_ID,
  27. default_args=default_args,
  28. schedule_interval=timedelta(days=1))
  29.  
  30. source_data_sensor = HdfsSensor(
  31. task_id='source_data_sensor',
  32. filepath='/data/mydata/{{ ds }}/file.csv',
  33. poke_interval=10,
  34. timeout=5,
  35. dag=dag
  36. )
  37.  
  38. create_hive_db = HiveOperator(
  39. task_id='create_hive_db',
  40. hql="DROP DATABASE IF EXISTS {db} CASCADE; CREATE DATABASE {db};".format(db='my_hive_db'),
  41. provide_context=True,
  42. dag=dag
  43. )
  44. create_hive_db.set_upstream(source_data_sensor)
  45.  
  46. hdfs_to_hive_trasfer = HiveOperator(
  47. task_id='hdfs_to_hive_trasfer',
  48. hql=hql.HQL_HDFS_TO_HIVE_TRANSFER.format(table_name='mydata',
  49. tmp_table_name='mydata_tmp',
  50. hdfs_path='/data/mydata/{{ ds }}'),
  51. schema='my_hive_db',
  52. provide_context=True,
  53. dag=dag
  54. )
  55. hdfs_to_hive_trasfer.set_upstream(create_hive_db)
  56.  
  57. count_data_rows = BranchPythonOperator(
  58. task_id='count_data_rows',
  59. python_callable=tasks.count_data_rows,
  60. templates_dict={'schema': 'my_hive_db'},
  61. provide_context=True,
  62. dag=dag
  63. )
  64. count_data_rows.set_upstream(hdfs_to_hive_trasfer)
  65.  
  66. stop_flow = DummyOperator(
  67. task_id='stop_flow',
  68. dag=dag
  69. )
  70.  
  71. create_source_id = PythonOperator(
  72. task_id='create_source_id',
  73. python_callable=tasks.create_source_id,
  74. templates_dict={'source': 'mydata'},
  75. provide_context=True,
  76. dag=dag
  77. )
  78. create_source_id.set_upstream(source_data_sensor)
  79.  
  80. clean_data = HiveOperator(
  81. task_id='clean_data',
  82. hql=hql.HQL_CLEAN_DATA.format(source_id="{{ task_instance.xcom_pull(task_ids='create_source_id') }}",
  83. clean_mydata='clean_mydata', mydata='mydata'),
  84. schema='my_hive_db',
  85. provide_context=True,
  86. dag=dag
  87. )
  88. clean_data.set_upstream(create_source_id)
  89. count_data_rows.set_downstream([stop_flow, clean_data])
  90.  
  91. move_data_mysql = PythonOperator(
  92. task_id='move_data_mysql',
  93. python_callable=tasks.move_data_mssql,
  94. templates_dict={'schema': 'my_hive_db'},
  95. provide_context=True,
  96. dag=dag
  97. )
  98. move_data_mysql.set_upstream(clean_data)
  99.  
  100. send_email = EmailOperator(
  101. task_id='send_email',
  102. to='mehmet.vergili@gmail.com',
  103. subject='ingestion complete',
  104. html_content="Date: {{ ds }}",
  105. dag=dag)
  106.  
  107. send_email.set_upstream(move_data_mysql)
Add Comment
Please, Sign In to add comment