Guest User


a guest
Feb 20th, 2019
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.48 KB | None | 0 0
  1. import json
  2. import re
  3. from os.path import expanduser
  4. from datetime import datetime, timedelta
  5. from airflow import DAG
  6. from airflow.utils import timezone
  7. from airflow.contrib.hooks.ssh_hook import SSHHook
  8. from airflow.contrib.operators.ssh_operator import SSHOperator
  9. from bw_config_tools.connect.bw_config import ConfigDbClient
  11. CONFIG_DB_INFO = '/etc/airflow/config_db_info.json'
  12. START_SCRIPT = '/bin/'
  13. TIME_IN_PAST = timezone.convert_to_utc(datetime(2019, 2, 14, 15, 00))
  15. DEFAULT_ARGS = {
  16. 'owner': 'airflow',
  17. 'depends_on_past': False,
  18. 'start_date': TIME_IN_PAST,
  19. 'email': [''],
  20. 'email_on_failure': False,
  21. 'email_on_retry': False,
  22. 'retries': 0,
  23. 'retry_delay': timedelta(minutes=1),
  24. }
  26. def _extract_instance_id(instance_string):
  27. return re.findall(r'd+', instance_string)[0]
  29. def _read_file_as_json(file_name):
  30. with open(file_name) as open_file:
  31. return json.load(open_file)
  33. DB_INFO = _read_file_as_json(CONFIG_DB_INFO)
  34. CONFIG_CLIENT = ConfigDbClient(**DB_INFO)
  36. print('Config DB client: {0}'.format(CONFIG_CLIENT))
  38. APP_DIRS = CONFIG_CLIENT.get_values('%solr-mentions-cleanup.[0-9]+.dir%', strictness='similar')
  41. _extract_instance_id(instance_string): directory+START_SCRIPT
  42. for instance_string, directory in APP_DIRS.items()
  43. }
  45. # Create an ssh hook which refers to pre-existing connection information
  46. # setup and stored by airflow
  47. SSH_HOOK = SSHHook(ssh_conn_id='solr-mentions-cleanups', key_file='/home/airflow/.ssh/id_rsa')
  49. # Create a DAG object to add tasks to
  50. DAG = DAG('solr-mentions-cleanups',
  51. default_args=DEFAULT_ARGS,
  52. schedule_interval='* * * * *'
  53. )
  55. DAG.catchup = False
  57. # Create a task for each solr-mentions-cleanup instance.
  58. for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
  59. task = SSHOperator(
  60. task_id='run-solr-mentions-cleanups-{0}'.format(instance_id),
  61. command='bash {0} disabled-queries --delete'.format(start_script),
  62. ssh_hook=SSH_HOOK,
  63. dag=DAG)
  65. (venv) airflow@some_host ~ # airflow run solr-mentions-cleanups run-solr-mentions-cleanups-0 2019-02-14
  66. [2019-02-20 12:38:51,313] {} DEBUG - Setting up DB connection pool (PID 16375)
  67. [2019-02-20 12:38:51,313] {} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16375
  68. [2019-02-20 12:38:51,491] {} DEBUG - Cannot import due to doesn't look like a module path
  69. [2019-02-20 12:38:51,645] {} INFO - Using executor LocalExecutor
  70. [2019-02-20 12:38:51,654] {} DEBUG - Adding <function default_action_log at 0x7f0364fdc8c8> to pre execution callback
  71. [2019-02-20 12:38:51,930] {} DEBUG - Calling callbacks: [<function default_action_log at 0x7f0364fdc8c8>]
  72. [2019-02-20 12:38:51,974] {} DEBUG - Setting up DB connection pool (PID 16375)
  73. [2019-02-20 12:38:51,974] {} DEBUG - settings.configure_orm(): Using NullPool
  74. [2019-02-20 12:38:51,976] {} INFO - Filling up the DagBag from /etc/airflow/dags
  75. [2019-02-20 12:38:51,978] {} INFO - File /etc/airflow/dags/ assumed to contain no DAGs. Skipping.
  76. [2019-02-20 12:38:51,978] {} DEBUG - Importing /etc/airflow/dags/
  77. [2019-02-20 12:38:51,983] {} DEBUG - Loaded DAG <DAG: hbase-daily-export>
  78. [2019-02-20 12:38:51,984] {} DEBUG - Importing /etc/airflow/dags/
  79. [2019-02-20 12:38:51,985] {} DEBUG - Loaded DAG <DAG: test_dag>
  80. [2019-02-20 12:38:51,986] {} DEBUG - Importing /etc/airflow/dags/
  81. Creating dag
  82. Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f032739b4e0>
  83. The key file given is /home/airflow/.ssh/id_rsa
  84. [2019-02-20 12:38:52,196] {} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
  85. extra connection info given:
  86. Key file in extra options: None
  87. SSH config file being used is /home/airflow/.ssh/config
  88. [2019-02-20 12:38:52,198] {} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
  89. [2019-02-20 12:38:52,251] {} INFO - Running <TaskInstance: 2019-02-14T00:00:00+00:00 [success]> on host
  90. [2019-02-20 12:38:54,026] {} DEBUG - Setting up DB connection pool (PID 16453)
  91. [2019-02-20 12:38:54,027] {} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16453
  92. [2019-02-20 12:38:54,207] {} DEBUG - Cannot import due to doesn't look like a module path
  93. [2019-02-20 12:38:54,362] {} INFO - Using executor LocalExecutor
  94. [2019-02-20 12:38:54,371] {} DEBUG - Adding <function default_action_log at 0x7f4345cfa8c8> to pre execution callback
  95. [2019-02-20 12:38:54,622] {} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4345cfa8c8>]
  96. [2019-02-20 12:38:54,658] {} DEBUG - Setting up DB connection pool (PID 16453)
  97. [2019-02-20 12:38:54,658] {} DEBUG - settings.configure_orm(): Using NullPool
  98. [2019-02-20 12:38:54,660] {} INFO - Filling up the DagBag from /etc/airflow/dags/
  99. [2019-02-20 12:38:54,662] {} DEBUG - Importing /etc/airflow/dags/
  101. Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f4308b5dc50>
  102. The key file given is /home/airflow/.ssh/id_rsa
  103. [2019-02-20 12:38:54,909] {} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
  104. extra connection info given:
  105. Key file in extra options: None
  106. SSH config file being used is /home/airflow/.ssh/config
  107. [2019-02-20 12:38:54,912] {} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
  108. [2019-02-20 12:38:54,961] {} INFO - Running <TaskInstance: 2019-02-14T00:00:00+00:00 [success]> on host
  109. [2019-02-20 12:38:55,054] {} DEBUG - Calling callbacks: []
  110. [2019-02-20 12:38:55,054] {} DEBUG - Disposing DB connection pool (PID 16453)
  111. [2019-02-20 12:38:56,310] {} DEBUG - Calling callbacks: []
  112. [2019-02-20 12:38:56,313] {} DEBUG - Disposing DB connection pool (PID 16375)
Add Comment
Please, Sign In to add comment