Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ##############
- # Cell 1 (pyspark)
- ##############
- pip install duckdb --pre --upgrade
- ##############
- # Cell 2 (pyspark)
- ##############
- import duckdb
- import pathlib
- # Enable VOrder and optimized write
- spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
- spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
- # Set scale factor variable which determines size of data model
- sf = 10
- ##############
- # Cell 3 (pyspark)
- ##############
- %%time
- # Load files to OneLake
- for x in range(0, sf) :
- con=duckdb.connect()
- con.sql('PRAGMA disable_progress_bar;SET preserve_insertion_order=false')
- con.sql(f"CALL dbgen(sf={sf} , children={sf}, step={x})")
- for tbl in ['nation','region','customer','supplier','lineitem','orders','partsupp','part'] :
- pathlib.Path(f'/lakehouse/default/Files/tpch{sf}/{tbl}').mkdir(parents=True, exist_ok=True)
- con.sql(f"COPY (SELECT * FROM {tbl}) TO '/lakehouse/default/Files/tpch{sf}/{tbl}/{x:02d}.parquet' ")
- con.close()
- ##############
- # Cell 4 (pyspark)
- ##############
- # Convert all tables to Delta
- from pyspark.sql.types import *
- def loadFullDataFromSource(table_name):
- df = spark.read.parquet(f'Files/tpch{sf}/' + table_name + '/*.parquet')
- df.write.mode("overwrite").format("delta").save(f"Tables/" + table_name)
- full_tables = [
- 'customer',
- 'lineitem',
- 'nation',
- 'orders' ,
- 'region',
- 'partsupp',
- 'supplier' ,
- 'part'
- ]
- for table in full_tables:
- loadFullDataFromSource(table)
- ##############
- # Cell 5 (SQL)
- ##############
- -- Create fact_orders
- CREATE OR REPLACE TEMPORARY VIEW vw_fact_orders
- AS
- SELECT
- orders.*, lineitem.*
- FROM
- orders AS orders
- INNER JOIN
- lineitem AS lineitem
- ON
- orders.o_orderkey = lineitem.l_orderkey;
- -- Create dim customer
- CREATE OR REPLACE TEMPORARY VIEW vw_dim_customer
- AS
- SELECT
- customer.*, nation.n_name AS nation_name, region.r_name AS region_name
- FROM
- customer
- LEFT JOIN
- nation
- ON
- customer.c_nationkey = nation.n_nationkey
- LEFT JOIN
- region
- ON
- nation.n_regionkey = region.r_regionkey;
- -- Create dim_supplier
- CREATE OR REPLACE TEMPORARY VIEW vw_dim_supplier
- AS
- SELECT
- supplier.*, nation.n_name AS nation_name, region.r_name AS region_name
- FROM
- supplier
- LEFT JOIN
- nation
- ON
- supplier.s_nationkey = nation.n_nationkey
- LEFT JOIN
- region
- ON
- nation.n_regionkey = region.r_regionkey;
- ##############
- # Cell 6 (pyspark)
- ##############
- vw_fact_orders = spark.sql("SELECT * FROM vw_fact_orders")
- vw_fact_orders.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/fact_orders")
- vw_dim_customer = spark.sql("SELECT * FROM vw_dim_customer")
- vw_dim_customer.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dim_customer")
- vw_dim_supplier = spark.sql("SELECT * FROM vw_dim_supplier")
- vw_dim_supplier.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dim_supplier")
- vw_dim_part = spark.sql("SELECT * FROM part")
- vw_dim_part.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dim_part")
- ##############
- Custom Dataset
- ##############
- 4 tables
- - fact_orders
- - dim_customer
- - dim_supplier
- - dim_part
- 3 relationships
- - 1:M - dim_supplier[s_suppkey] -> fact_order[l_suppkey]
- - 1:M - dim_customer[c_custkey] -> fact_order[o_custkey]
- - 1:M - dim_part[p_partkey] -> fact_order[l_partkey]
- Measure
- - Total Extended Price = SUM( fact_orders[l_extendedprice] )
Advertisement
Add Comment
Please, Sign In to add comment