Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## Third party Library Imports
- import pandas as pd
- import psycopg2
- import airflow
- from airflow import DAG
- from airflow.operators import BashOperator
- from datetime import datetime, timedelta
- from sqlalchemy import create_engine
- import io
- # Following are defaults which can be overridden later on
- default_args = {
- 'owner': 'admin',
- 'depends_on_past': False,
- 'start_date': datetime(2018, 5, 21),
- 'retries': 1,
- 'retry_delay': timedelta(minutes=1),
- }
- dag = DAG('dwh_sample23', default_args=default_args)
- #######################
- ## Login to DB
- def db_login():
- ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
- global dwh_connection
- try:
- dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
- except:
- print("I am unable to connect to the database.")
- print('Success')
- return(dwh_connection)
- def tbl1_del():
- ''' This function takes clears all rows from tbl1 '''
- cur = dwh_connection.cursor()
- cur.execute("""DELETE FROM tbl1;""")
- dwh_connection.commit()
- def pop_tbl1():
- ''' This function populates all rows in tbl1 '''
- cur = dwh_connection.cursor()
- cur.execute(""" INSERT INTO tbl1
- select id,name,price from tbl2;""")
- dwh_connection.commit()
- db_login()
- tbl1_del()
- pop_tbl1()
- dwh_connection.close()
- ##########################################
- t1 = BashOperator(
- task_id='DB_Connect',
- python_callable=db_login(),
- bash_command='python3 ~/airflow/dags/dwh_sample23.py',
- dag=dag)
- t2 = BashOperator(
- task_id='del',
- python_callable=tbl1_del(),
- bash_command='python3 ~/airflow/dags/dwh_sample23.py',
- dag=dag)
- t3 = BashOperator(
- task_id='populate',
- python_callable=pop_tbl1(),
- bash_command='python3 ~/airflow/dags/dwh_sample23.py',
- dag=dag)
- t1.set_downstream(t2)
- t2.set_downstream(t3)
Add Comment
Please, Sign In to add comment