Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package retail
- import com.typesafe.config.ConfigFactory
- import org.apache.hadoop.fs.{FileSystem, Path}
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.sql.hive.HiveContext
- /**
- * Created by itversity on 27/03/17.
- * build.sbt
- name := "doc"
- version := "1.0"
- scalaVersion := "2.10.6"
- libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"
- libraryDependencies += "com.typesafe" % "config" % "1.3.1"
- libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"
- libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2"
- * spark-submit
- spark-submit --class retail.DailyRevenuePerDayPerDepartmentHive \
- --master yarn \
- --conf spark.ui.port=25613 \
- doc_2.10-1.0.jar /user/dgadiraju/DailyRevenuePerDayPerDepartmentHive prod
- */
- object DailyRevenuePerDayPerDepartmentHive {
- def main(args: Array[String]) {
- val appConf = ConfigFactory.load()
- val conf = new SparkConf().
- setAppName("Revenue by department per day").
- setMaster(appConf.getConfig(args(1)).getString("executionMode"))
- val sc = new SparkContext(conf)
- val sqlContext = new HiveContext(sc)
- val outputPath = args(0)
- val fs = FileSystem.get(sc.hadoopConfiguration)
- val outputPathExists = fs.exists(new Path(outputPath))
- if(outputPathExists)
- fs.delete(new Path(outputPath), true)
- sqlContext.sql("use doc")
- sqlContext.setConf("spark.sql.shuffle.partitions", "2")
- sqlContext.sql("select o.order_date, d.department_name, sum(oi.order_item_subtotal) order_revenue " +
- "from departments d join categories c on d.department_id = c.category_department_id " +
- "join products p on c.category_id = p.product_category_id " +
- "join order_items oi on p.product_id = oi.order_item_product_id " +
- "join orders o on oi.order_item_order_id = o.order_id " +
- "where o.order_status in ('COMPLETE', 'CLOSED') " +
- "group by o.order_date, d.department_name " +
- "order by o.order_date, d.department_name").
- rdd.
- map(rec => rec.mkString("\t")).
- saveAsTextFile(outputPath)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement