Advertisement
Guest User

Untitled

a guest
Jul 18th, 2019
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.55 KB | None | 0 0
  1. import requests
  2.  
  3. from airflow import DAG
  4. from airflow.hooks.postgres_hook import PostgresHook
  5. from airflow.operators.python_operator import PythonOperator
  6. from datetime import datetime, timedelta
  7.  
  8.  
  9. URL = 'https://automatetheboringstuff.com/files/rj.txt'
  10.  
  11. def get_response(url):
  12.     response = requests.get(url)    
  13.  
  14.     return response.status_code
  15.  
  16. def load_data(**kwargs):
  17.  
  18.     ti = kwargs['ti']
  19.     pg_hook = PostgresHook(postrgres_conn_id='stantions')
  20.  
  21.     insert_cmd = """ INSERT INTO stantions
  22.     (status, hearthbeat)
  23.     VALUES
  24.     (%s, %s);"""
  25.  
  26.     st = ti.xcom_pull(task_ids='task1')
  27.     execution_time = datetime.now()
  28.     row = st, execution_time
  29.  
  30.     pg_hook.run(insert_cmd, parameters= row)
  31.  
  32. default_args = {
  33.     'owner': 'airflow',
  34.     'depends_on_past': False,
  35.     'start_date': datetime(2019, 7, 17),
  36.     'end_date': datetime(2019, 7, 19),
  37.     'email': ['airflow@airflow.com'],
  38.     'email_on_failure': False,
  39.     'email_on_retry': False,
  40.     'retries': 2,
  41.     'retry_delay': timedelta(minutes=3),
  42. }
  43.  
  44. #Use cron to run the DAG every 30 minutes.
  45. schedule_interval = "0,30 * * * *"
  46.  
  47. #Define DAG
  48. dag = DAG(
  49.     'get_response_and_save_it',
  50.     default_args= default_args,
  51.     schedule_interval= schedule_interval
  52.     )
  53.  
  54. # Task 1: get the response code from the URL
  55. t1 = PythonOperator(
  56.     task_id = 'task1',
  57.     python_callable= get_response,
  58.     provide_context= False,
  59.     op_kwargs = {'url':URL},
  60.     dag=dag
  61.     )
  62.  
  63. # Task 2: insert into postrgress
  64. t2 = PythonOperator(
  65.     task_id= 'task2',
  66.     python_callable= load_data,
  67.     provide_context= True,
  68.     # op_kwargs = [],
  69.     dag= dag
  70.     )
  71.  
  72. t1 >> t2
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement