Advertisement
AliaksandrLet

Спринт 5/12 → Тема 4/7: Реализация DWH → Урок 8/9 → Задание 1. Заполните таблицу dm_settlement_repor

Aug 21st, 2023 (edited)
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.41 KB | None | 0 0
  1. from airflow import DAG
  2. from airflow.operators.python import PythonOperator
  3. from airflow.hooks.base import BaseHook
  4. import psycopg2
  5.  
  6. from datetime import datetime as dt, timedelta
  7. import pandas as pd
  8. import json
  9.  
  10. dag = DAG(
  11.     dag_id='mart_dm_settlement_report_update',
  12.     schedule_interval='0 1 * * *',
  13.     start_date=dt(2023, 8, 20),
  14.     catchup=False,
  15.     dagrun_timeout=timedelta(minutes=60)
  16. )
  17.  
  18.  
  19. def mart_loader():
  20.     psql_conn = BaseHook.get_connection('PG_WAREHOUSE_CONNECTION')
  21.     conn = psycopg2.connect(
  22.         f"dbname='de' port='{psql_conn.port}' user='{psql_conn.login}' host='{psql_conn.host}' password='{psql_conn.password}'")
  23.     cur = conn.cursor()
  24.  
  25.     dm_settlement_report = """
  26.         TRUNCATE TABLE cdm.dm_settlement_report RESTART IDENTITY;
  27.         INSERT INTO
  28.             cdm.dm_settlement_report
  29.         (
  30.             restaurant_id, restaurant_name, settlement_date, orders_count, orders_total_sum,
  31.             orders_bonus_payment_sum, orders_bonus_granted_sum, order_processing_fee, restaurant_reward_sum
  32.         )
  33.         WITH total_sum_orsers AS (
  34.             SELECT
  35.                 order_id,
  36.                 SUM(total_sum) AS orders_total_sum,
  37.                 SUM(bonus_payment) AS orders_bonus_payment_sum,
  38.                 SUM(bonus_grant) AS orders_bonus_granted_sum
  39.             FROM
  40.                 dds.fct_product_sales fps
  41.             GROUP BY
  42.                 order_id
  43.         )
  44.         SELECT
  45.             dmo.restaurant_id AS restaurant_id,
  46.             r.restaurant_name,
  47.             ts.date AS settlement_date,
  48.             COUNT(*) AS orders_count,
  49.             SUM(tso.orders_total_sum) AS orders_total_sum,
  50.             SUM(tso.orders_bonus_payment_sum) AS orders_bonus_payment_sum,
  51.             SUM(tso.orders_bonus_granted_sum) AS orders_bonus_granted_sum,
  52.             SUM(tso.orders_total_sum) * 0.25 AS order_processing_fee,
  53.             (SUM(tso.orders_total_sum)) - (SUM(tso.orders_bonus_payment_sum)) - (SUM(tso.orders_total_sum) * 0.25) AS restaurant_reward_sum
  54.         FROM
  55.             dds.dm_orders AS dmo
  56.             INNER JOIN dds.dm_timestamps AS ts ON ts.id = dmo.timestamp_id
  57.             INNER JOIN dds.dm_restaurants AS r ON r.id = dmo.restaurant_id
  58.             INNER JOIN total_sum_orsers tso ON tso.order_id = dmo.id
  59.         WHERE
  60.             dmo.order_status ='CLOSED'
  61.         GROUP BY
  62.             dmo.restaurant_id,
  63.             r.restaurant_name,
  64.             ts."date";
  65.    """
  66.     cur.execute(dm_settlement_report)
  67.     conn.commit()
  68.  
  69.     cur.close()
  70.     conn.close()
  71.  
  72.     return 200
  73.  
  74. load_mart = PythonOperator(task_id='load_mart',
  75.                                             python_callable=mart_loader,
  76.                                             dag=dag)
  77.  
  78. load_mart
  79.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement