Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import re
- from os.path import expanduser
- from datetime import datetime, timedelta
- from airflow import DAG
- from airflow.utils import timezone
- from airflow.contrib.hooks.ssh_hook import SSHHook
- from airflow.contrib.operators.ssh_operator import SSHOperator
- from bw_config_tools.connect.bw_config import ConfigDbClient
- CONFIG_DB_INFO = '/etc/airflow/config_db_info.json'
- START_SCRIPT = '/bin/start.sh'
- TIME_IN_PAST = timezone.convert_to_utc(datetime(2019, 2, 14, 15, 00))
- DEFAULT_ARGS = {
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'start_date': TIME_IN_PAST,
- 'email': ['example@domain.com'],
- 'email_on_failure': False,
- 'email_on_retry': False,
- 'retries': 0,
- 'retry_delay': timedelta(minutes=1),
- }
- def _extract_instance_id(instance_string):
- return re.findall(r'd+', instance_string)[0]
- def _read_file_as_json(file_name):
- with open(file_name) as open_file:
- return json.load(open_file)
- DB_INFO = _read_file_as_json(CONFIG_DB_INFO)
- CONFIG_CLIENT = ConfigDbClient(**DB_INFO)
- print('Config DB client: {0}'.format(CONFIG_CLIENT))
- APP_DIRS = CONFIG_CLIENT.get_values('%solr-mentions-cleanup.[0-9]+.dir%', strictness='similar')
- INSTANCE_START_SCRIPT_PATHS = {
- _extract_instance_id(instance_string): directory+START_SCRIPT
- for instance_string, directory in APP_DIRS.items()
- }
- # Create an ssh hook which refers to pre-existing connection information
- # setup and stored by airflow
- SSH_HOOK = SSHHook(ssh_conn_id='solr-mentions-cleanups', key_file='/home/airflow/.ssh/id_rsa')
- # Create a DAG object to add tasks to
- DAG = DAG('solr-mentions-cleanups',
- default_args=DEFAULT_ARGS,
- schedule_interval='* * * * *'
- )
- DAG.catchup = False
- # Create a task for each solr-mentions-cleanup instance.
- for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
- task = SSHOperator(
- task_id='run-solr-mentions-cleanups-{0}'.format(instance_id),
- command='bash {0} disabled-queries --delete'.format(start_script),
- ssh_hook=SSH_HOOK,
- dag=DAG)
- (venv) airflow@some_host ~ # airflow run solr-mentions-cleanups run-solr-mentions-cleanups-0 2019-02-14
- [2019-02-20 12:38:51,313] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16375)
- [2019-02-20 12:38:51,313] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16375
- [2019-02-20 12:38:51,491] {__init__.py:42} DEBUG - Cannot import due to doesn't look like a module path
- [2019-02-20 12:38:51,645] {__init__.py:51} INFO - Using executor LocalExecutor
- [2019-02-20 12:38:51,654] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f0364fdc8c8> to pre execution callback
- [2019-02-20 12:38:51,930] {cli_action_loggers.py:64} DEBUG - Calling callbacks: [<function default_action_log at 0x7f0364fdc8c8>]
- [2019-02-20 12:38:51,974] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16375)
- [2019-02-20 12:38:51,974] {settings.py:154} DEBUG - settings.configure_orm(): Using NullPool
- [2019-02-20 12:38:51,976] {models.py:273} INFO - Filling up the DagBag from /etc/airflow/dags
- [2019-02-20 12:38:51,978] {models.py:360} INFO - File /etc/airflow/dags/__init__.py assumed to contain no DAGs. Skipping.
- [2019-02-20 12:38:51,978] {models.py:363} DEBUG - Importing /etc/airflow/dags/hbase-exports.py
- [2019-02-20 12:38:51,983] {models.py:501} DEBUG - Loaded DAG <DAG: hbase-daily-export>
- [2019-02-20 12:38:51,984] {models.py:363} DEBUG - Importing /etc/airflow/dags/test_dag.py
- [2019-02-20 12:38:51,985] {models.py:501} DEBUG - Loaded DAG <DAG: test_dag>
- [2019-02-20 12:38:51,986] {models.py:363} DEBUG - Importing /etc/airflow/dags/solr_mentions_cleanup.py
- Creating dag
- Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f032739b4e0>
- The key file given is /home/airflow/.ssh/id_rsa
- [2019-02-20 12:38:52,196] {base_hook.py:83} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
- extra connection info given:
- Key file in extra options: None
- SSH config file being used is /home/airflow/.ssh/config
- [2019-02-20 12:38:52,198] {models.py:501} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
- [2019-02-20 12:38:52,251] {cli.py:520} INFO - Running <TaskInstance: solr-mentions-cleanups.run-solr-mentions-cleanups-0 2019-02-14T00:00:00+00:00 [success]> on host xxxxxx.net
- [2019-02-20 12:38:54,026] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16453)
- [2019-02-20 12:38:54,027] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16453
- [2019-02-20 12:38:54,207] {__init__.py:42} DEBUG - Cannot import due to doesn't look like a module path
- [2019-02-20 12:38:54,362] {__init__.py:51} INFO - Using executor LocalExecutor
- [2019-02-20 12:38:54,371] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f4345cfa8c8> to pre execution callback
- [2019-02-20 12:38:54,622] {cli_action_loggers.py:64} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4345cfa8c8>]
- [2019-02-20 12:38:54,658] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16453)
- [2019-02-20 12:38:54,658] {settings.py:154} DEBUG - settings.configure_orm(): Using NullPool
- [2019-02-20 12:38:54,660] {models.py:273} INFO - Filling up the DagBag from /etc/airflow/dags/solr_mentions_cleanup.py
- [2019-02-20 12:38:54,662] {models.py:363} DEBUG - Importing /etc/airflow/dags/solr_mentions_cleanup.py
- Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f4308b5dc50>
- The key file given is /home/airflow/.ssh/id_rsa
- [2019-02-20 12:38:54,909] {base_hook.py:83} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
- extra connection info given:
- Key file in extra options: None
- SSH config file being used is /home/airflow/.ssh/config
- [2019-02-20 12:38:54,912] {models.py:501} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
- [2019-02-20 12:38:54,961] {cli.py:520} INFO - Running <TaskInstance: solr-mentions-cleanups.run-solr-mentions-cleanups-0 2019-02-14T00:00:00+00:00 [success]> on host xxxx.net
- [2019-02-20 12:38:55,054] {cli_action_loggers.py:81} DEBUG - Calling callbacks: []
- [2019-02-20 12:38:55,054] {settings.py:201} DEBUG - Disposing DB connection pool (PID 16453)
- [2019-02-20 12:38:56,310] {cli_action_loggers.py:81} DEBUG - Calling callbacks: []
- [2019-02-20 12:38:56,313] {settings.py:201} DEBUG - Disposing DB connection pool (PID 16375)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement