Guest User

Untitled

a guest
May 24th, 2018
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.79 KB | None | 0 0
  1. ## Third party Library Imports
  2. import pandas as pd
  3. import psycopg2
  4. import airflow
  5. from airflow import DAG
  6. from airflow.operators import BashOperator
  7. from datetime import datetime, timedelta
  8. from sqlalchemy import create_engine
  9. import io
  10.  
  11.  
  12. # Following are defaults which can be overridden later on
  13. default_args = {
  14. 'owner': 'admin',
  15. 'depends_on_past': False,
  16. 'start_date': datetime(2018, 5, 21),
  17. 'retries': 1,
  18. 'retry_delay': timedelta(minutes=1),
  19. }
  20.  
  21. dag = DAG('dwh_sample23', default_args=default_args)
  22.  
  23.  
  24. #######################
  25. ## Login to DB
  26.  
  27. def db_login():
  28. ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
  29. global dwh_connection
  30. try:
  31. dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
  32. except:
  33. print("I am unable to connect to the database.")
  34. print('Success')
  35. return(dwh_connection)
  36.  
  37. def tbl1_del():
  38. ''' This function takes clears all rows from tbl1 '''
  39. cur = dwh_connection.cursor()
  40. cur.execute("""DELETE FROM tbl1;""")
  41. dwh_connection.commit()
  42.  
  43.  
  44. def pop_tbl1():
  45. ''' This function populates all rows in tbl1 '''
  46. cur = dwh_connection.cursor()
  47. cur.execute(""" INSERT INTO tbl1
  48. select id,name,price from tbl2;""")
  49. dwh_connection.commit()
  50.  
  51.  
  52.  
  53. db_login()
  54. tbl1_del()
  55. pop_tbl1()
  56. dwh_connection.close()
  57.  
  58. ##########################################
  59.  
  60.  
  61. t1 = BashOperator(
  62. task_id='DB_Connect',
  63. python_callable=db_login(),
  64. bash_command='python3 ~/airflow/dags/dwh_sample23.py',
  65. dag=dag)
  66.  
  67. t2 = BashOperator(
  68. task_id='del',
  69. python_callable=tbl1_del(),
  70. bash_command='python3 ~/airflow/dags/dwh_sample23.py',
  71. dag=dag)
  72.  
  73.  
  74. t3 = BashOperator(
  75. task_id='populate',
  76. python_callable=pop_tbl1(),
  77. bash_command='python3 ~/airflow/dags/dwh_sample23.py',
  78. dag=dag)
  79.  
  80.  
  81. t1.set_downstream(t2)
  82. t2.set_downstream(t3)
Add Comment
Please, Sign In to add comment