Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- from airflow import DAG
- from airflow.hooks.postgres_hook import PostgresHook
- from airflow.operators.python_operator import PythonOperator
- from datetime import datetime, timedelta
- URL = 'https://automatetheboringstuff.com/files/rj.txt'
- def get_response(url):
- response = requests.get(url)
- return response.status_code
- def load_data(**kwargs):
- ti = kwargs['ti']
- pg_hook = PostgresHook(postrgres_conn_id='stantions')
- insert_cmd = """ INSERT INTO stantions
- (status, hearthbeat)
- VALUES
- (%s, %s);"""
- st = ti.xcom_pull(task_ids='task1')
- execution_time = datetime.now()
- row = st, execution_time
- pg_hook.run(insert_cmd, parameters= row)
- default_args = {
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'start_date': datetime(2019, 7, 17),
- 'end_date': datetime(2019, 7, 19),
- 'email': ['airflow@airflow.com'],
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 2,
- 'retry_delay': timedelta(minutes=3),
- }
- #Use cron to run the DAG every 30 minutes.
- schedule_interval = "0,30 * * * *"
- #Define DAG
- dag = DAG(
- 'get_response_and_save_it',
- default_args= default_args,
- schedule_interval= schedule_interval
- )
- # Task 1: get the response code from the URL
- t1 = PythonOperator(
- task_id = 'task1',
- python_callable= get_response,
- provide_context= False,
- op_kwargs = {'url':URL},
- dag=dag
- )
- # Task 2: insert into postrgress
- t2 = PythonOperator(
- task_id= 'task2',
- python_callable= load_data,
- provide_context= True,
- # op_kwargs = [],
- dag= dag
- )
- t1 >> t2
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement