Advertisement
Guest User

Untitled

a guest
Mar 29th, 2017
55
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.06 KB | None | 0 0
  1. package retail
  2.  
  3. import com.typesafe.config.ConfigFactory
  4. import org.apache.hadoop.fs.{FileSystem, Path}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. import org.apache.spark.sql.hive.HiveContext
  7.  
  8. /**
  9. * Created by itversity on 27/03/17.
  10. * build.sbt
  11. name := "doc"
  12.  
  13. version := "1.0"
  14.  
  15. scalaVersion := "2.10.6"
  16.  
  17. libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.2"
  18. libraryDependencies += "com.typesafe" % "config" % "1.3.1"
  19. libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.2"
  20. libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.2"
  21.  
  22. * spark-submit
  23. spark-submit --class retail.DailyRevenuePerDayPerDepartmentHive \
  24. --master yarn \
  25. --conf spark.ui.port=25613 \
  26. doc_2.10-1.0.jar /user/dgadiraju/DailyRevenuePerDayPerDepartmentHive prod
  27. */
  28.  
  29. object DailyRevenuePerDayPerDepartmentHive {
  30. def main(args: Array[String]) {
  31. val appConf = ConfigFactory.load()
  32. val conf = new SparkConf().
  33. setAppName("Revenue by department per day").
  34. setMaster(appConf.getConfig(args(1)).getString("executionMode"))
  35. val sc = new SparkContext(conf)
  36. val sqlContext = new HiveContext(sc)
  37.  
  38. val outputPath = args(0)
  39.  
  40. val fs = FileSystem.get(sc.hadoopConfiguration)
  41. val outputPathExists = fs.exists(new Path(outputPath))
  42.  
  43. if(outputPathExists)
  44. fs.delete(new Path(outputPath), true)
  45.  
  46. sqlContext.sql("use doc")
  47. sqlContext.setConf("spark.sql.shuffle.partitions", "2")
  48.  
  49. sqlContext.sql("select o.order_date, d.department_name, sum(oi.order_item_subtotal) order_revenue " +
  50. "from departments d join categories c on d.department_id = c.category_department_id " +
  51. "join products p on c.category_id = p.product_category_id " +
  52. "join order_items oi on p.product_id = oi.order_item_product_id " +
  53. "join orders o on oi.order_item_order_id = o.order_id " +
  54. "where o.order_status in ('COMPLETE', 'CLOSED') " +
  55. "group by o.order_date, d.department_name " +
  56. "order by o.order_date, d.department_name").
  57. rdd.
  58. map(rec => rec.mkString("\t")).
  59. saveAsTextFile(outputPath)
  60. }
  61. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement