Guest User

TPCH Notebook/Direct Lake Code

a guest
Aug 29th, 2023
182
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.64 KB | None | 0 0
  1. ##############
  2. # Cell 1 (pyspark)
  3. ##############
  4. pip install duckdb --pre --upgrade
  5.  
  6.  
  7. ##############
  8. # Cell 2 (pyspark)
  9. ##############
  10. import duckdb
  11. import pathlib
  12.  
  13. # Enable VOrder and optimized write
  14. spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
  15. spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
  16.  
  17. # Set scale factor variable which determines size of data model
  18. sf = 10
  19.  
  20.  
  21.  
  22. ##############
  23. # Cell 3 (pyspark)
  24. ##############
  25. %%time
  26. # Load files to OneLake
  27. for x in range(0, sf) :
  28. con=duckdb.connect()
  29. con.sql('PRAGMA disable_progress_bar;SET preserve_insertion_order=false')
  30. con.sql(f"CALL dbgen(sf={sf} , children={sf}, step={x})")
  31. for tbl in ['nation','region','customer','supplier','lineitem','orders','partsupp','part'] :
  32. pathlib.Path(f'/lakehouse/default/Files/tpch{sf}/{tbl}').mkdir(parents=True, exist_ok=True)
  33. con.sql(f"COPY (SELECT * FROM {tbl}) TO '/lakehouse/default/Files/tpch{sf}/{tbl}/{x:02d}.parquet' ")
  34. con.close()
  35.  
  36.  
  37.  
  38. ##############
  39. # Cell 4 (pyspark)
  40. ##############
  41. # Convert all tables to Delta
  42. from pyspark.sql.types import *
  43.  
  44. def loadFullDataFromSource(table_name):
  45. df = spark.read.parquet(f'Files/tpch{sf}/' + table_name + '/*.parquet')
  46. df.write.mode("overwrite").format("delta").save(f"Tables/" + table_name)
  47.  
  48. full_tables = [
  49. 'customer',
  50. 'lineitem',
  51. 'nation',
  52. 'orders' ,
  53. 'region',
  54. 'partsupp',
  55. 'supplier' ,
  56. 'part'
  57. ]
  58.  
  59. for table in full_tables:
  60. loadFullDataFromSource(table)
  61.  
  62.  
  63. ##############
  64. # Cell 5 (SQL)
  65. ##############
  66.  
  67. -- Create fact_orders
  68. CREATE OR REPLACE TEMPORARY VIEW vw_fact_orders
  69. AS
  70. SELECT
  71. orders.*, lineitem.*
  72. FROM
  73. orders AS orders
  74. INNER JOIN
  75. lineitem AS lineitem
  76. ON
  77. orders.o_orderkey = lineitem.l_orderkey;
  78.  
  79. -- Create dim customer
  80. CREATE OR REPLACE TEMPORARY VIEW vw_dim_customer
  81. AS
  82. SELECT
  83. customer.*, nation.n_name AS nation_name, region.r_name AS region_name
  84. FROM
  85. customer
  86. LEFT JOIN
  87. nation
  88. ON
  89. customer.c_nationkey = nation.n_nationkey
  90. LEFT JOIN
  91. region
  92. ON
  93. nation.n_regionkey = region.r_regionkey;
  94.  
  95. -- Create dim_supplier
  96. CREATE OR REPLACE TEMPORARY VIEW vw_dim_supplier
  97. AS
  98. SELECT
  99. supplier.*, nation.n_name AS nation_name, region.r_name AS region_name
  100. FROM
  101. supplier
  102. LEFT JOIN
  103. nation
  104. ON
  105. supplier.s_nationkey = nation.n_nationkey
  106. LEFT JOIN
  107. region
  108. ON
  109. nation.n_regionkey = region.r_regionkey;
  110.  
  111.  
  112. ##############
  113. # Cell 6 (pyspark)
  114. ##############
  115.  
  116. vw_fact_orders = spark.sql("SELECT * FROM vw_fact_orders")
  117. vw_fact_orders.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/fact_orders")
  118.  
  119. vw_dim_customer = spark.sql("SELECT * FROM vw_dim_customer")
  120. vw_dim_customer.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dim_customer")
  121.  
  122. vw_dim_supplier = spark.sql("SELECT * FROM vw_dim_supplier")
  123. vw_dim_supplier.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dim_supplier")
  124.  
  125. vw_dim_part = spark.sql("SELECT * FROM part")
  126. vw_dim_part.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dim_part")
  127.  
  128. ##############
  129. Custom Dataset
  130. ##############
  131.  
  132. 4 tables
  133. - fact_orders
  134. - dim_customer
  135. - dim_supplier
  136. - dim_part
  137.  
  138. 3 relationships
  139. - 1:M - dim_supplier[s_suppkey] -> fact_order[l_suppkey]
  140. - 1:M - dim_customer[c_custkey] -> fact_order[o_custkey]
  141. - 1:M - dim_part[p_partkey] -> fact_order[l_partkey]
  142.  
  143. Measure
  144. - Total Extended Price = SUM( fact_orders[l_extendedprice] )
Advertisement
Add Comment
Please, Sign In to add comment