SHARE
TWEET

Untitled

a guest Feb 20th, 2019 134 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import json
  2. import re
  3. from os.path import expanduser
  4. from datetime import datetime, timedelta
  5. from airflow import DAG
  6. from airflow.utils import timezone
  7. from airflow.contrib.hooks.ssh_hook import SSHHook
  8. from airflow.contrib.operators.ssh_operator import SSHOperator
  9. from bw_config_tools.connect.bw_config import ConfigDbClient
  10.  
  11. CONFIG_DB_INFO = '/etc/airflow/config_db_info.json'
  12. START_SCRIPT = '/bin/start.sh'
  13. TIME_IN_PAST = timezone.convert_to_utc(datetime(2019, 2, 14, 15, 00))
  14.  
  15. DEFAULT_ARGS = {
  16.     'owner': 'airflow',
  17.     'depends_on_past': False,
  18.     'start_date': TIME_IN_PAST,
  19.     'email': ['example@domain.com'],
  20.     'email_on_failure': False,
  21.     'email_on_retry': False,
  22.     'retries': 0,
  23.     'retry_delay': timedelta(minutes=1),
  24. }
  25.  
  26. def _extract_instance_id(instance_string):
  27.     return re.findall(r'd+', instance_string)[0]
  28.  
  29. def _read_file_as_json(file_name):
  30.     with open(file_name) as open_file:
  31.         return json.load(open_file)
  32.  
  33. DB_INFO = _read_file_as_json(CONFIG_DB_INFO)
  34. CONFIG_CLIENT = ConfigDbClient(**DB_INFO)
  35.  
  36. print('Config DB client: {0}'.format(CONFIG_CLIENT))
  37.  
  38. APP_DIRS = CONFIG_CLIENT.get_values('%solr-mentions-cleanup.[0-9]+.dir%', strictness='similar')
  39.  
  40. INSTANCE_START_SCRIPT_PATHS = {
  41.     _extract_instance_id(instance_string): directory+START_SCRIPT
  42.     for instance_string, directory in APP_DIRS.items()
  43.     }
  44.  
  45. # Create an ssh hook which refers to pre-existing connection information
  46. # setup and stored by airflow
  47. SSH_HOOK = SSHHook(ssh_conn_id='solr-mentions-cleanups', key_file='/home/airflow/.ssh/id_rsa')
  48.  
  49. # Create a DAG object to add tasks to
  50. DAG = DAG('solr-mentions-cleanups',
  51.           default_args=DEFAULT_ARGS,
  52.           schedule_interval='* * * * *'
  53.           )
  54.  
  55. DAG.catchup = False
  56.  
  57. # Create a task for each solr-mentions-cleanup instance.
  58. for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
  59.     task = SSHOperator(
  60.         task_id='run-solr-mentions-cleanups-{0}'.format(instance_id),
  61.         command='bash {0}  disabled-queries --delete'.format(start_script),
  62.         ssh_hook=SSH_HOOK,
  63.         dag=DAG)
  64.      
  65. (venv) airflow@some_host ~ # airflow run solr-mentions-cleanups run-solr-mentions-cleanups-0 2019-02-14
  66. [2019-02-20 12:38:51,313] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16375)
  67. [2019-02-20 12:38:51,313] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16375
  68. [2019-02-20 12:38:51,491] {__init__.py:42} DEBUG - Cannot import  due to  doesn't look like a module path
  69. [2019-02-20 12:38:51,645] {__init__.py:51} INFO - Using executor LocalExecutor
  70. [2019-02-20 12:38:51,654] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f0364fdc8c8> to pre execution callback
  71. [2019-02-20 12:38:51,930] {cli_action_loggers.py:64} DEBUG - Calling callbacks: [<function default_action_log at 0x7f0364fdc8c8>]
  72. [2019-02-20 12:38:51,974] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16375)
  73. [2019-02-20 12:38:51,974] {settings.py:154} DEBUG - settings.configure_orm(): Using NullPool
  74. [2019-02-20 12:38:51,976] {models.py:273} INFO - Filling up the DagBag from /etc/airflow/dags
  75. [2019-02-20 12:38:51,978] {models.py:360} INFO - File /etc/airflow/dags/__init__.py assumed to contain no DAGs. Skipping.
  76. [2019-02-20 12:38:51,978] {models.py:363} DEBUG - Importing /etc/airflow/dags/hbase-exports.py
  77. [2019-02-20 12:38:51,983] {models.py:501} DEBUG - Loaded DAG <DAG: hbase-daily-export>
  78. [2019-02-20 12:38:51,984] {models.py:363} DEBUG - Importing /etc/airflow/dags/test_dag.py
  79. [2019-02-20 12:38:51,985] {models.py:501} DEBUG - Loaded DAG <DAG: test_dag>
  80. [2019-02-20 12:38:51,986] {models.py:363} DEBUG - Importing /etc/airflow/dags/solr_mentions_cleanup.py
  81. Creating dag
  82. Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f032739b4e0>
  83. The key file given is /home/airflow/.ssh/id_rsa
  84. [2019-02-20 12:38:52,196] {base_hook.py:83} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
  85. extra connection info given:
  86. Key file in extra options:  None
  87. SSH config file being used is  /home/airflow/.ssh/config
  88. [2019-02-20 12:38:52,198] {models.py:501} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
  89. [2019-02-20 12:38:52,251] {cli.py:520} INFO - Running <TaskInstance: solr-mentions-cleanups.run-solr-mentions-cleanups-0 2019-02-14T00:00:00+00:00 [success]> on host xxxxxx.net
  90. [2019-02-20 12:38:54,026] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16453)
  91. [2019-02-20 12:38:54,027] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16453
  92. [2019-02-20 12:38:54,207] {__init__.py:42} DEBUG - Cannot import  due to  doesn't look like a module path
  93. [2019-02-20 12:38:54,362] {__init__.py:51} INFO - Using executor LocalExecutor
  94. [2019-02-20 12:38:54,371] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f4345cfa8c8> to pre execution callback
  95. [2019-02-20 12:38:54,622] {cli_action_loggers.py:64} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4345cfa8c8>]
  96. [2019-02-20 12:38:54,658] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16453)
  97. [2019-02-20 12:38:54,658] {settings.py:154} DEBUG - settings.configure_orm(): Using NullPool
  98. [2019-02-20 12:38:54,660] {models.py:273} INFO - Filling up the DagBag from /etc/airflow/dags/solr_mentions_cleanup.py
  99. [2019-02-20 12:38:54,662] {models.py:363} DEBUG - Importing /etc/airflow/dags/solr_mentions_cleanup.py
  100.  
  101. Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f4308b5dc50>
  102. The key file given is /home/airflow/.ssh/id_rsa
  103. [2019-02-20 12:38:54,909] {base_hook.py:83} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
  104. extra connection info given:
  105. Key file in extra options:  None
  106. SSH config file being used is  /home/airflow/.ssh/config
  107. [2019-02-20 12:38:54,912] {models.py:501} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
  108. [2019-02-20 12:38:54,961] {cli.py:520} INFO - Running <TaskInstance: solr-mentions-cleanups.run-solr-mentions-cleanups-0 2019-02-14T00:00:00+00:00 [success]> on host xxxx.net
  109. [2019-02-20 12:38:55,054] {cli_action_loggers.py:81} DEBUG - Calling callbacks: []
  110. [2019-02-20 12:38:55,054] {settings.py:201} DEBUG - Disposing DB connection pool (PID 16453)
  111. [2019-02-20 12:38:56,310] {cli_action_loggers.py:81} DEBUG - Calling callbacks: []
  112. [2019-02-20 12:38:56,313] {settings.py:201} DEBUG - Disposing DB connection pool (PID 16375)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top