Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- from airflow.models import DAG
- from airflow.hooks.base_hook import BaseHook
- from airflow.contrib.operators.vertica_operator import VerticaOperator
- src_vertica_id = 'fs_source'
- dst_vertica_id = 'ga_datamart'
- def create_task(dag):
- params = {
- src_vertica_id: BaseHook.get_connection(src_vertica_id),
- dst_vertica_id: BaseHook.get_connection(dst_vertica_id),
- 'target_table': 'stats_full_daily_test'
- }
- prepare = VerticaOperator(task_id='prepare',
- sql='prepare.sql',
- retries=0,
- params=params,
- vertica_conn_id=dst_vertica_id,
- on_failure_callback=cleanup_on_failure,
- dag=dag)
- export = VerticaOperator(task_id='export',
- sql='export.sql',
- retries=0,
- params=params,
- vertica_conn_id=src_vertica_id,
- #on_failure_callback=
- dag=dag)
- moveout = VerticaOperator(task_id='moveout',
- sql='moveout.sql',
- retries=0,
- params=params,
- vertica_conn_id=dst_vertica_id,
- #on_failure_callback=
- #on_success_callback=
- dag=dag)
- cleanup = VerticaOperator(task_id='cleanup',
- sql='cleanup.sql',
- retries=0,
- params=params,
- vertica_conn_id=dst_vertica_id,
- dag=dag)
- prepare.set_downstream(export)
- export.set_downstream(moveout)
- moveout.set_downstream(cleanup)
- def cleanup_on_failure(context):
- dag = context['dag']
- params = context['params']
- cleanup = VerticaOperator(task_id='cleanup_on_failure',
- sql='cleanup.sql',
- #sql='DROP TABLE IF EXISTS etl.%s_%s CASCADE;' % (params['target_table'], context['ts_nodash']),
- retries=0,
- params=params,
- vertica_conn_id=dst_vertica_id,
- dag=dag)
- cleanup.execute(context)
- def create_dag():
- dag_id = 'ga_full_stats_test_v2'
- args = {
- 'owner': 'ga',
- # 'email': ['k5a7b0y6i7a9u2r6@propellerads.slack.com'],
- # 'email_on_failure': True,
- 'email_on_retry': True,
- 'retries': 5,
- 'start_date': datetime.datetime(2017, 06, 05),
- }
- dag = DAG(dag_id=dag_id,
- schedule_interval='* */5 * * *',
- concurrency=1,
- max_active_runs=1,
- default_args=args,
- catchup=True)
- create_task(dag)
- globals()[dag_id] = dag
- create_dag()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement