Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val orders = sc.textFile("/public/retail_db/orders")
- val ordersMap = orders.
- map(o => (o.split(",")(0).toInt, o.split(",")(3)))
- val orderItems = sc.textFile("/public/retail_db/order_items")
- val orderItemsMap = orderItems.
- map(oi => (oi.split(",")(1).toInt, oi))
- val ordersLeftOuterJoin = ordersMap.
- leftOuterJoin(orderItemsMap)
- val ordersLeftOuterJoinFiltered = ordersLeftOuterJoin.
- filter(o => o._2._2 == None)
- val orderCountByStatus = ordersLeftOuterJoinFiltered.
- map(o => (o._2._1, 1)).
- reduceByKey((agg, ele) => agg + ele)
- orderCountByStatus.
- collect.
- foreach(println)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement