Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Get top N customers by revenue for each day
- # COMPLETE and CLOSED orders
- # Use Spark SQL
- from pyspark import SparkConf,SparkContext
- from pyspark.sql import HiveContext, Row
- import sys
- executionMode = sys.argv[1]
- topN = int(sys.argv[2])
- inputBaseDir = sys.argv[3]
- outputDir = sys.argv[4]
- conf = SparkConf().setAppName("Top " + str(topN) + " customers per day").setMaster(executionMode)
- sc = SparkContext(conf=conf)
- sqlContext = HiveContext(sc)
- ordersRDD = sc.textFile(inputBaseDir + "orders")
- ordersDF = ordersRDD. \
- map(lambda o: Row(order_id=int(o.split(",")[0]), order_date=o.split(",")[1],
- order_customer_id = int(o.split(",")[2]), order_status=o.split(",")[3])). \
- toDF()
- ordersDF.registerTempTable("orders")
- # sqlContext.sql("select * from orders").show()
- orderItemsRDD = sc.textFile(inputBaseDir + "order_items")
- orderItemsDF = orderItemsRDD. \
- map(lambda oi: Row(order_item_id=int(oi.split(",")[0]),
- order_item_order_id=int(oi.split(",")[1]),
- order_item_product_id=int(oi.split(",")[2]),
- order_item_quantity=int(oi.split(",")[3]),
- order_item_subtotal=float(oi.split(",")[4]),
- order_item_product_price=float(oi.split(",")[5]))). \
- toDF()
- orderItemsDF.registerTempTable("order_items")
- customers = sc.textFile(inputBaseDir + "/customers")
- customersDF = customers.\
- map(lambda o: Row(customer_id = int(o.split(",")[0]),
- customer_fname = o.split(",")[1],
- customer_lname=o.split(",")[2],
- customer_email=o.split(",")[3],
- customer_password = o.split(",")[4],
- customer_street=o.split(",")[5],
- customer_city=o.split(",")[6],
- customer_state=o.split(",")[7],
- customer_zipcode=o.split(",")[8])).\
- toDF()
- customersDF.registerTempTable("customers")
- sqlContext.setConf("spark.sql.shuffle.partitions", "2")
- sqlContext. \
- sql("select order_date, order_customer_id, "
- "sum(order_item_subtotal) daily_revenue_per_customer "
- "from orders o join order_items oi "
- "on o.order_id = oi.order_item_order_id "
- "where order_status in ('COMPLETE', 'CLOSED') "
- "group by order_date, order_customer_id"). \
- registerTempTable("daily_revenue_per_customer")
- topNCustomersPerDay = sqlContext.sql("select order_date, "
- "concat(concat(customer_fname, ', '), customer_lname) customer_name, "
- "daily_revenue_per_customer from "
- "(select order_date, order_customer_id, "
- "daily_revenue_per_customer, "
- "rank() over (partition by order_date order by daily_revenue_per_customer desc) rnk "
- "from daily_revenue_per_customer) q join customers c "
- "on c.customer_id = q.order_customer_id "
- "where rnk <= " + str(topN) + " "
- "order by order_date, rnk")
- topNCustomersPerDay.save(outputDir, "json")
- # topNCustomersPerDay.write.json(outputDir)
Add Comment
Please, Sign In to add comment