Advertisement
gubichas

Untitled

Jan 23rd, 2025
21
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.87 KB | None | 0 0
  1. import pathlib
  2. import os
  3. import json
  4. import getpass
  5. from datetime import datetime, timedelta
  6. from airflow import DAG
  7. from airflow import macros
  8. from airflow.operators.bash import BashOperator
  9. from airflow.operators.dummy import DummyOperator
  10. from airflow.models.variable import Variable
  11. from airflow.exceptions import AirflowException
  12.  
  13. # Проверяем и инициализируем переменные окружения
  14. PROJECT_DIR = 'top100_postgress_datamart'
  15. PROJECTS_PATH = Variable.get('PROJECTS_PATH', '/default/path/to/projects')
  16. CREDS_DIR = pathlib.Path(Variable.get('CREDS_DIR', '/default/path/to/creds'))
  17.  
  18. # Проверяем, что директории существуют
  19. if not CREDS_DIR.exists():
  20. raise AirflowException(f"CREDS_DIR {CREDS_DIR} not found.")
  21.  
  22. project_path = pathlib.Path(PROJECTS_PATH) / PROJECT_DIR
  23.  
  24. db_creds_path = CREDS_DIR / 'db_credentials.json'
  25.  
  26. # Проверяем существование файла с кредами
  27. if db_creds_path.exists():
  28. with open(db_creds_path) as file:
  29. creds = json.load(file)
  30. DB_ANALYTICS = creds.get('postgres', {})
  31.  
  32. env_vars = {
  33. 'DBHOST': DB_ANALYTICS.get('dbhost', ''),
  34. 'DBPORT': str(DB_ANALYTICS.get('dbport', '')),
  35. 'DBUSER': DB_ANALYTICS.get('dbuser', ''),
  36. 'DBPASS': DB_ANALYTICS.get('dbpass', ''),
  37. 'DBNAME': DB_ANALYTICS.get('dbname', ''),
  38. 'CREDS_DIR': str(CREDS_DIR),
  39. 'TG_CREDS': Variable.get('TG_CREDS', ''),
  40. 'HADOOP_CONF_DIR': Variable.get('HADOOP_CONF_DIR', ''),
  41. 'HADOOP_USER_NAME': Variable.get('HADOOP_USER_NAME', ''),
  42. 'HADOOP_HOME': Variable.get('HADO_HOME', ''),
  43. 'LOGS_DIR': Variable.get('LOGS_DIR', '')
  44. }
  45. else:
  46. raise FileNotFoundError(f"Credentials file not found: {db_creds_path}")
  47.  
  48. default_args = {
  49. 'owner': 'a.gubich',
  50. 'retries': 3,
  51. 'retry_delay': timedelta(seconds=30),
  52. 'execution_timeout': timedelta(minutes=15),
  53. }
  54.  
  55. with DAG(
  56. dag_id='top100_postgress_datamart2',
  57. start_date=datetime(2024, 1, 1),
  58. end_date=datetime(2025, 1, 1),
  59. schedule_interval='@daily',
  60. default_args=default_args,
  61. catchup=True,
  62. max_active_runs=1,
  63. tags=['a.gubich', 'spark'],
  64. ) as dag:
  65.  
  66. # Подготовка команды для запуска spark-submit
  67. command = (
  68. f'/opt/spark/bin/spark-submit '
  69. f'--conf spark.yarn.maxAppAttempts=1 --master yarn '
  70. f'--deploy-mode cluster '
  71. f'--queue Analytics '
  72. f'{project_path}/hdfs_to_hdfs.py '
  73. f'-start_date $date '
  74. f'-end_date $date'
  75. )
  76.  
  77. # Задача для запуска spark-submit
  78. task_hdfs_to_hdfs = BashOperator(
  79. task_id='hdfs_to_hdfs',
  80. bash_command=command,
  81. env={
  82. 'date': '{{ ds }}',
  83. 'PYSPARK_PYTHON': "/usr/local/bin/python3.9",
  84. 'HTTP_PROXY': '',
  85. 'HTTPS_PROXY': '',
  86. **env_vars
  87. },
  88. queue='spark',
  89. execution_timeout=timedelta(days=1),
  90. )
  91.  
  92. # Команда для другого задания, которое записывает в ClickHouse
  93. command = f'python3 {project_path}/hdfs_to_ch.py -start_date $start_date -end_date $end_date'
  94.  
  95. task_hdfs_to_ch = BashOperator(
  96. task_id='hdfs_to_ch',
  97. bash_command=command,
  98. env={
  99. 'start_date': '{{ execution_date.date() }}',
  100. 'end_date': '{{ execution_date.date() }}',
  101. 'PYSPARK_PYTHON': "/usr/local/bin/python3.9",
  102. 'HTTP_PROXY': '',
  103. 'HTTPS_PROXY': '',
  104. **env_vars
  105. },
  106. queue='spark',
  107. execution_timeout=timedelta(days=1),
  108. )
  109.  
  110. # Финальная пустая задача
  111. done = DummyOperator(task_id='done')
  112.  
  113. # Зависимости
  114. task_hdfs_to_hdfs >> task_hdfs_to_ch >> done
  115.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement