Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def setGlobalView(desc: Description, lineitem: DataFrame, session: SparkSession, query: String) = {
- val customer: DataFrame = desc.customer
- val orders : DataFrame = desc.orders
- val supplier: DataFrame = desc.supplier
- val nation : DataFrame = desc.nation
- val region : DataFrame = desc.region
- val part = desc.part
- val partsupp = desc.partsupp
- lineitem.createGlobalTempView("lineitem")
- customer.createGlobalTempView("customer")
- orders.createGlobalTempView("orders")
- supplier.createGlobalTempView("supplier")
- nation.createGlobalTempView("nation")
- region.createGlobalTempView("region")
- part.createGlobalTempView("part")
- partsupp.createGlobalTempView("partsupp")
- session.sql(query).rdd
- }
- def execute_Q1(desc: Description, session: SparkSession, params: List[Any]) = {
- // TODO: implement
- val queryNum = 1
- assert(params.size == 1)
- var lineitem = desc.lineitem
- val queryMap = desc.sampleDescription.asInstanceOf[Map[Int, Int]]
- //This is used to compensate when a sample is being used
- var compensate = ""
- if (queryMap != null && queryMap.contains(queryNum)) {
- val new_schema = getSchema(desc.lineitem)
- val sample_index = queryMap(queryNum)
- val rdd = desc.samples(sample_index).asInstanceOf[RDD[Row]]
- val new_lineitem = session.createDataFrame(rdd, new_schema)
- lineitem = new_lineitem
- compensate = " * ratio"
- }
- val q1 = s"""select
- l_returnflag,
- l_linestatus,
- sum(l_quantity ${compensate}) as sum_qty,
- sum(l_extendedprice ${compensate}) as sum_base_price,
- sum(l_extendedprice * (1 - l_discount) ${compensate}) as sum_disc_price,
- sum(l_extendedprice * (1 - l_discount) ${compensate} * (1 + l_tax)) as sum_charge,
- avg(l_quantity) as avg_qty,
- avg(l_extendedprice) as avg_price,
- avg(l_discount) as avg_disc,
- count(*) as count_order
- from
- global_temp.lineitem
- where
- l_shipdate <= date '1998-12-01' - interval '${params(0)}' day
- group by
- l_returnflag,
- l_linestatus
- order by
- l_returnflag,
- l_linestatus"""
- setGlobalView(desc, desc.lineitem, session, q1)
- // For example, for Q1, params(0) is the interval from the where close
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement