Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from airflow import DAG
- from airflow.operators.python import PythonOperator
- from airflow.hooks.base import BaseHook
- import psycopg2
- from datetime import datetime as dt, timedelta
- import pandas as pd
- import json
- dag = DAG(
- dag_id='mart_dm_settlement_report_update',
- schedule_interval='0 1 * * *',
- start_date=dt(2023, 8, 20),
- catchup=False,
- dagrun_timeout=timedelta(minutes=60)
- )
- def mart_loader():
- psql_conn = BaseHook.get_connection('PG_WAREHOUSE_CONNECTION')
- conn = psycopg2.connect(
- f"dbname='de' port='{psql_conn.port}' user='{psql_conn.login}' host='{psql_conn.host}' password='{psql_conn.password}'")
- cur = conn.cursor()
- dm_settlement_report = """
- TRUNCATE TABLE cdm.dm_settlement_report RESTART IDENTITY;
- INSERT INTO
- cdm.dm_settlement_report
- (
- restaurant_id, restaurant_name, settlement_date, orders_count, orders_total_sum,
- orders_bonus_payment_sum, orders_bonus_granted_sum, order_processing_fee, restaurant_reward_sum
- )
- WITH total_sum_orsers AS (
- SELECT
- order_id,
- SUM(total_sum) AS orders_total_sum,
- SUM(bonus_payment) AS orders_bonus_payment_sum,
- SUM(bonus_grant) AS orders_bonus_granted_sum
- FROM
- dds.fct_product_sales fps
- GROUP BY
- order_id
- )
- SELECT
- dmo.restaurant_id AS restaurant_id,
- r.restaurant_name,
- ts.date AS settlement_date,
- COUNT(*) AS orders_count,
- SUM(tso.orders_total_sum) AS orders_total_sum,
- SUM(tso.orders_bonus_payment_sum) AS orders_bonus_payment_sum,
- SUM(tso.orders_bonus_granted_sum) AS orders_bonus_granted_sum,
- SUM(tso.orders_total_sum) * 0.25 AS order_processing_fee,
- (SUM(tso.orders_total_sum)) - (SUM(tso.orders_bonus_payment_sum)) - (SUM(tso.orders_total_sum) * 0.25) AS restaurant_reward_sum
- FROM
- dds.dm_orders AS dmo
- INNER JOIN dds.dm_timestamps AS ts ON ts.id = dmo.timestamp_id
- INNER JOIN dds.dm_restaurants AS r ON r.id = dmo.restaurant_id
- INNER JOIN total_sum_orsers tso ON tso.order_id = dmo.id
- WHERE
- dmo.order_status ='CLOSED'
- GROUP BY
- dmo.restaurant_id,
- r.restaurant_name,
- ts."date";
- """
- cur.execute(dm_settlement_report)
- conn.commit()
- cur.close()
- conn.close()
- return 200
- load_mart = PythonOperator(task_id='load_mart',
- python_callable=mart_loader,
- dag=dag)
- load_mart
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement