Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import timedelta, datetime
- import os
- from airflow import DAG
- from airflow.operators.dummy_operator import DummyOperator
- from airflow.operators.python_operator import PythonOperator
- # import pyping
- # Default args for DAG.
- default_args = {
- 'description': 'Determines whether or not an IP address is active',
- 'depends_on_past': False,
- 'start_date': datetime(2020, 4, 21),
- 'catchup': False,
- 'email': ['email@email.com'],
- 'email_on_failure': True,
- 'email_on_retry': False,
- 'retry_delay': timedelta(minutes=5),
- }
- # DAG schedule interval, defined by CRON.
- schedule_interval = '0 0 * * *' # Once a day
- # Python def for the ping_ip_task.
- def ping_ip():
- ip_address = "000.000.000.00.00" # My laptop IP
- response = os.system("ping -c 1 " + ip_address)
- if response == 0:
- pingstatus = "Network Active."
- else:
- pingstatus = "Network Error."
- print("\n *** Network status for IP Address=%s is : ***" % ip_address)
- print(pingstatus)
- return pingstatus, response
- # Open DAG, give it an ID, schedule, default_args.
- with DAG(dag_id='ip_ping', schedule_interval=schedule_interval, default_args=default_args) as dag:
- # Define a Dummy task that does nothing.
- dummy_task = DummyOperator(task_id='dummy_task', retries=3)
- # Define a task that returns the network status of an IP Address.
- ping_ip_task = PythonOperator(task_id='ping_ip', python_callable=ping_ip, retries=1)
- # Declare order of task execution.
- dummy_task >> ping_ip_task
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement