Advertisement
Guest User

Untitled

a guest
Apr 26th, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.41 KB | None | 0 0
  1. package exercise
  2.  
  3. import com.typesafe.config.ConfigFactory
  4. import org.apache.hadoop.fs.{FileSystem, Path}
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. import org.apache.spark.sql.SQLContext
  7. import org.apache.spark.sql.functions._
  8.  
  9. object RevPerDeptWithDF {
  10. def main(args: Array[String]): Unit = {
  11. val props = ConfigFactory.load()
  12. val conf = new SparkConf().
  13. setAppName("Rev Per Dept With DF")
  14. .setMaster(props.getConfig(args(0)).getString("executionMode"))
  15. val sc = new SparkContext(conf)
  16.  
  17. val sqlContext = new SQLContext(sc)
  18. import sqlContext.implicits._
  19.  
  20. val inputPath = args(1)
  21. val outputPath = args(2)
  22.  
  23. val fs :FileSystem = FileSystem.get(sc.hadoopConfiguration)
  24. val op = new Path(outputPath)
  25. if(!fs.exists(new Path(inputPath))){
  26. println("Invalid input path")
  27. }
  28.  
  29. if(fs.exists(op)){
  30. fs.delete(op, true)
  31. }
  32.  
  33. val ordersDF = sc.textFile(inputPath + "/orders").map(rec => {
  34. val r = rec.split(",")
  35. Orders(r(0).toInt, r(1), r(2).toInt, r(3))
  36. }).toDF()
  37.  
  38. val orderItemsDF = sc.textFile(inputPath+"/order_items").
  39. map(rec => {
  40. val a = rec.split(",")
  41. OrderItems(
  42. a(0).toInt,
  43. a(1).toInt,
  44. a(2).toInt,
  45. a(3).toInt,
  46. a(4).toFloat,
  47. a(5).toFloat)
  48. }).toDF()
  49.  
  50. val productsDF = sc.textFile(inputPath+"/products").filter(rec => !rec.startsWith("685")).map(rec => {
  51. val r = rec.split(",")
  52. Products(r(0).toInt, r(1).toInt, r(2), r(3), r(4).toFloat, r(5))
  53. }).toDF()
  54.  
  55. val categoriesDF = sc.textFile(inputPath+"/categories").map(rec => {
  56. val r = rec.split(",")
  57. Categories(r(0).toInt, r(1).toInt, r(2))
  58. }).toDF()
  59.  
  60. val deptDF = sc.textFile(inputPath+"/departments").map(rec => {
  61. val r = rec.split(",")
  62. Departments(r(0).toInt, r(1))
  63. }).toDF()
  64.  
  65. ordersDF.filter(ordersDF("order_status") === "COMPLETE" || ordersDF("order_status") === "PENDING").join(orderItemsDF, ordersDF("order_id") === orderItemsDF(
  66. "order_item_order_id")).join(productsDF, orderItemsDF("order_item_product_id")===productsDF("product_id"))
  67. .join(categoriesDF, productsDF("product_category_id")===categoriesDF("category_id")).join(deptDF, categoriesDF("category_department_id")===deptDF("department_id")).groupBy(deptDF("department_name"))
  68. .agg(sum(orderItemsDF("order_item_subtotal"))).sort(deptDF("department_name")).rdd.saveAsTextFile(outputPath)
  69.  
  70. }
  71. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement