Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pathlib
- import os
- import json
- import getpass
- from datetime import datetime, timedelta
- from airflow import DAG
- from airflow import macros
- from airflow.operators.bash import BashOperator
- from airflow.operators.dummy import DummyOperator
- from airflow.models.variable import Variable
- from airflow.exceptions import AirflowException
- # Проверяем и инициализируем переменные окружения
- PROJECT_DIR = 'top100_postgress_datamart'
- PROJECTS_PATH = Variable.get('PROJECTS_PATH', '/default/path/to/projects')
- CREDS_DIR = pathlib.Path(Variable.get('CREDS_DIR', '/default/path/to/creds'))
- # Проверяем, что директории существуют
- if not CREDS_DIR.exists():
- raise AirflowException(f"CREDS_DIR {CREDS_DIR} not found.")
- project_path = pathlib.Path(PROJECTS_PATH) / PROJECT_DIR
- db_creds_path = CREDS_DIR / 'db_credentials.json'
- # Проверяем существование файла с кредами
- if db_creds_path.exists():
- with open(db_creds_path) as file:
- creds = json.load(file)
- DB_ANALYTICS = creds.get('postgres', {})
- env_vars = {
- 'DBHOST': DB_ANALYTICS.get('dbhost', ''),
- 'DBPORT': str(DB_ANALYTICS.get('dbport', '')),
- 'DBUSER': DB_ANALYTICS.get('dbuser', ''),
- 'DBPASS': DB_ANALYTICS.get('dbpass', ''),
- 'DBNAME': DB_ANALYTICS.get('dbname', ''),
- 'CREDS_DIR': str(CREDS_DIR),
- 'TG_CREDS': Variable.get('TG_CREDS', ''),
- 'HADOOP_CONF_DIR': Variable.get('HADOOP_CONF_DIR', ''),
- 'HADOOP_USER_NAME': Variable.get('HADOOP_USER_NAME', ''),
- 'HADOOP_HOME': Variable.get('HADO_HOME', ''),
- 'LOGS_DIR': Variable.get('LOGS_DIR', '')
- }
- else:
- raise FileNotFoundError(f"Credentials file not found: {db_creds_path}")
- default_args = {
- 'owner': 'a.gubich',
- 'retries': 3,
- 'retry_delay': timedelta(seconds=30),
- 'execution_timeout': timedelta(minutes=15),
- }
- with DAG(
- dag_id='top100_postgress_datamart2',
- start_date=datetime(2024, 1, 1),
- end_date=datetime(2025, 1, 1),
- schedule_interval='@daily',
- default_args=default_args,
- catchup=True,
- max_active_runs=1,
- tags=['a.gubich', 'spark'],
- ) as dag:
- # Подготовка команды для запуска spark-submit
- command = (
- f'/opt/spark/bin/spark-submit '
- f'--conf spark.yarn.maxAppAttempts=1 --master yarn '
- f'--deploy-mode cluster '
- f'--queue Analytics '
- f'{project_path}/hdfs_to_hdfs.py '
- f'-start_date $date '
- f'-end_date $date'
- )
- # Задача для запуска spark-submit
- task_hdfs_to_hdfs = BashOperator(
- task_id='hdfs_to_hdfs',
- bash_command=command,
- env={
- 'date': '{{ ds }}',
- 'PYSPARK_PYTHON': "/usr/local/bin/python3.9",
- 'HTTP_PROXY': '',
- 'HTTPS_PROXY': '',
- **env_vars
- },
- queue='spark',
- execution_timeout=timedelta(days=1),
- )
- # Команда для другого задания, которое записывает в ClickHouse
- command = f'python3 {project_path}/hdfs_to_ch.py -start_date $start_date -end_date $end_date'
- task_hdfs_to_ch = BashOperator(
- task_id='hdfs_to_ch',
- bash_command=command,
- env={
- 'start_date': '{{ execution_date.date() }}',
- 'end_date': '{{ execution_date.date() }}',
- 'PYSPARK_PYTHON': "/usr/local/bin/python3.9",
- 'HTTP_PROXY': '',
- 'HTTPS_PROXY': '',
- **env_vars
- },
- queue='spark',
- execution_timeout=timedelta(days=1),
- )
- # Финальная пустая задача
- done = DummyOperator(task_id='done')
- # Зависимости
- task_hdfs_to_hdfs >> task_hdfs_to_ch >> done
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement