Advertisement
Guest User

Untitled

a guest
Jun 29th, 2017
505
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.00 KB | None | 0 0
  1. import datetime
  2.  
  3. from airflow.models import DAG
  4. from airflow.hooks.base_hook import BaseHook
  5. from airflow.contrib.operators.vertica_operator import VerticaOperator
  6.  
  7. src_vertica_id = 'fs_source'
  8. dst_vertica_id = 'ga_datamart'
  9.  
  10. def create_task(dag):
  11. params = {
  12. src_vertica_id: BaseHook.get_connection(src_vertica_id),
  13. dst_vertica_id: BaseHook.get_connection(dst_vertica_id),
  14. 'target_table': 'stats_full_daily_test'
  15. }
  16.  
  17. prepare = VerticaOperator(task_id='prepare',
  18. sql='prepare.sql',
  19. retries=0,
  20. params=params,
  21. vertica_conn_id=dst_vertica_id,
  22. on_failure_callback=cleanup_on_failure,
  23. dag=dag)
  24.  
  25. export = VerticaOperator(task_id='export',
  26. sql='export.sql',
  27. retries=0,
  28. params=params,
  29. vertica_conn_id=src_vertica_id,
  30. #on_failure_callback=
  31. dag=dag)
  32.  
  33. moveout = VerticaOperator(task_id='moveout',
  34. sql='moveout.sql',
  35. retries=0,
  36. params=params,
  37. vertica_conn_id=dst_vertica_id,
  38. #on_failure_callback=
  39. #on_success_callback=
  40. dag=dag)
  41.  
  42. cleanup = VerticaOperator(task_id='cleanup',
  43. sql='cleanup.sql',
  44. retries=0,
  45. params=params,
  46. vertica_conn_id=dst_vertica_id,
  47. dag=dag)
  48.  
  49. prepare.set_downstream(export)
  50. export.set_downstream(moveout)
  51. moveout.set_downstream(cleanup)
  52.  
  53. def cleanup_on_failure(context):
  54. dag = context['dag']
  55. params = context['params']
  56.  
  57. cleanup = VerticaOperator(task_id='cleanup_on_failure',
  58. sql='cleanup.sql',
  59. #sql='DROP TABLE IF EXISTS etl.%s_%s CASCADE;' % (params['target_table'], context['ts_nodash']),
  60. retries=0,
  61. params=params,
  62. vertica_conn_id=dst_vertica_id,
  63. dag=dag)
  64. cleanup.execute(context)
  65.  
  66. def create_dag():
  67. dag_id = 'ga_full_stats_test_v2'
  68.  
  69. args = {
  70. 'owner': 'ga',
  71. # 'email': ['k5a7b0y6i7a9u2r6@propellerads.slack.com'],
  72. # 'email_on_failure': True,
  73. 'email_on_retry': True,
  74. 'retries': 5,
  75. 'start_date': datetime.datetime(2017, 06, 05),
  76. }
  77.  
  78. dag = DAG(dag_id=dag_id,
  79. schedule_interval='* */5 * * *',
  80. concurrency=1,
  81. max_active_runs=1,
  82. default_args=args,
  83. catchup=True)
  84.  
  85. create_task(dag)
  86.  
  87. globals()[dag_id] = dag
  88.  
  89. create_dag()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement