Advertisement
Guest User

Untitled

a guest
May 19th, 2019
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.50 KB | None | 0 0
  1. def setGlobalView(desc: Description, lineitem: DataFrame, session: SparkSession, query: String) = {
  2.     val customer: DataFrame = desc.customer
  3.     val orders  : DataFrame = desc.orders
  4.     val supplier: DataFrame = desc.supplier
  5.     val nation  : DataFrame = desc.nation
  6.     val region  : DataFrame = desc.region
  7.     val part = desc.part
  8.     val partsupp = desc.partsupp
  9.     lineitem.createGlobalTempView("lineitem")
  10.     customer.createGlobalTempView("customer")
  11.     orders.createGlobalTempView("orders")
  12.     supplier.createGlobalTempView("supplier")
  13.     nation.createGlobalTempView("nation")
  14.     region.createGlobalTempView("region")
  15.     part.createGlobalTempView("part")
  16.     partsupp.createGlobalTempView("partsupp")
  17.  
  18.     session.sql(query).rdd
  19.   }
  20.   def execute_Q1(desc: Description, session: SparkSession, params: List[Any]) = {
  21.     // TODO: implement
  22.     val queryNum = 1
  23.     assert(params.size == 1)
  24.     var lineitem = desc.lineitem
  25.     val queryMap = desc.sampleDescription.asInstanceOf[Map[Int, Int]]
  26.     //This is used to compensate when a sample is being used
  27.     var compensate = ""
  28.     if (queryMap != null && queryMap.contains(queryNum)) {
  29.       val new_schema = getSchema(desc.lineitem)
  30.       val sample_index = queryMap(queryNum)
  31.       val rdd = desc.samples(sample_index).asInstanceOf[RDD[Row]]
  32.       val new_lineitem = session.createDataFrame(rdd, new_schema)
  33.       lineitem = new_lineitem
  34.       compensate = " * ratio"
  35.     }
  36.     val q1 = s"""select
  37.                     l_returnflag,
  38.                     l_linestatus,
  39.                     sum(l_quantity ${compensate}) as sum_qty,
  40.                     sum(l_extendedprice ${compensate}) as sum_base_price,
  41.                     sum(l_extendedprice * (1 - l_discount) ${compensate}) as sum_disc_price,
  42.                     sum(l_extendedprice * (1 - l_discount) ${compensate} * (1 + l_tax)) as sum_charge,
  43.                     avg(l_quantity) as avg_qty,
  44.                     avg(l_extendedprice) as avg_price,
  45.                     avg(l_discount) as avg_disc,
  46.                     count(*) as count_order
  47.                from
  48.                     global_temp.lineitem
  49.                where
  50.                     l_shipdate <= date '1998-12-01' - interval '${params(0)}' day
  51.                group by
  52.                     l_returnflag,
  53.                     l_linestatus
  54.                order by
  55.                     l_returnflag,
  56.                  l_linestatus"""
  57.     setGlobalView(desc, desc.lineitem, session, q1)
  58.     // For example, for Q1, params(0) is the interval from the where close
  59.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement