Guest User

Untitled

a guest
Dec 16th, 2017
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.09 KB | None | 0 0
  1. # Get top N customers by revenue for each day
  2. # COMPLETE and CLOSED orders
  3. # Use Spark SQL
  4.  
  5. from pyspark import SparkConf,SparkContext
  6. from pyspark.sql import HiveContext, Row
  7. import sys
  8.  
  9. executionMode = sys.argv[1]
  10. topN = int(sys.argv[2])
  11. inputBaseDir = sys.argv[3]
  12. outputDir = sys.argv[4]
  13.  
  14. conf = SparkConf().setAppName("Top " + str(topN) + " customers per day").setMaster(executionMode)
  15. sc = SparkContext(conf=conf)
  16. sqlContext = HiveContext(sc)
  17.  
  18. ordersRDD = sc.textFile(inputBaseDir + "orders")
  19. ordersDF = ordersRDD. \
  20. map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1],
  21. order_customer_id = int(o.split(",")[2]), order_status=o.split(",")[3])). \
  22. toDF()
  23. ordersDF.registerTempTable("orders")
  24. # sqlContext.sql("select * from orders").show()
  25.  
  26. orderItemsRDD = sc.textFile(inputBaseDir + "order_items")
  27. orderItemsDF = orderItemsRDD. \
  28. map(lambda oi: Row(order_item_id=int(oi.split(",")[0]),
  29. order_item_order_id=int(oi.split(",")[1]),
  30. order_item_product_id=int(oi.split(",")[2]),
  31. order_item_quantity=int(oi.split(",")[3]),
  32. order_item_subtotal=float(oi.split(",")[4]),
  33. order_item_product_price=float(oi.split(",")[5]))). \
  34. toDF()
  35. orderItemsDF.registerTempTable("order_items")
  36.  
  37. customers = sc.textFile(inputBaseDir + "/customers")
  38. customersDF = customers.\
  39. map(lambda o: Row(customer_id = int(o.split(",")[0]),
  40. customer_fname = o.split(",")[1],
  41. customer_lname=o.split(",")[2],
  42. customer_email=o.split(",")[3],
  43. customer_password = o.split(",")[4],
  44. customer_street=o.split(",")[5],
  45. customer_city=o.split(",")[6],
  46. customer_state=o.split(",")[7],
  47. customer_zipcode=o.split(",")[8])).\
  48. toDF()
  49. customersDF.registerTempTable("customers")
  50. sqlContext.setConf("spark.sql.shuffle.partitions", "2")
  51. sqlContext. \
  52. sql("select order_date, order_customer_id, "
  53. "sum(order_item_subtotal) daily_revenue_per_customer "
  54. "from orders o join order_items oi "
  55. "on o.order_id = oi.order_item_order_id "
  56. "where order_status in ('COMPLETE', 'CLOSED') "
  57. "group by order_date, order_customer_id"). \
  58. registerTempTable("daily_revenue_per_customer")
  59. topNCustomersPerDay = sqlContext.sql("select order_date, "
  60. "concat(concat(customer_fname, ', '), customer_lname) customer_name, "
  61. "daily_revenue_per_customer from "
  62. "(select order_date, order_customer_id, "
  63. "daily_revenue_per_customer, "
  64. "rank() over (partition by order_date order by daily_revenue_per_customer desc) rnk "
  65. "from daily_revenue_per_customer) q join customers c "
  66. "on c.customer_id = q.order_customer_id "
  67. "where rnk <= " + str(topN) + " "
  68. "order by order_date, rnk")
  69. topNCustomersPerDay.save(outputDir, "json")
  70. # topNCustomersPerDay.write.json(outputDir)
Add Comment
Please, Sign In to add comment