Guest User

Untitled

a guest
Jun 30th, 2018
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.63 KB | None | 0 0
  1. /*
  2. sqoop import \
  3. --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
  4. --username retail_user \
  5. --password itversity \
  6. --table orders \
  7. --target-dir data/retail_db/orders \
  8. --as-textfile \
  9. --num-mappers 1
  10. */
  11.  
  12. /*
  13. sqoop import \
  14. --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
  15. --username retail_user \
  16. --password itversity \
  17. --table customers \
  18. --target-dir data/retail_db/customers \
  19. --as-textfile \
  20. --num-mappers 1
  21. */
  22.  
  23. /* Data is available in local file system /data/retail_db
  24. Source directories: /data/retail_db/orders and /data/retail_db/customers
  25. Source delimiter: comma (“,”)
  26. Source Columns - orders - order_id, order_date, order_customer_id, order_status
  27. Source Columns - customers - customer_id, customer_fname, customer_lname and many more
  28. Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
  29. Target Columns: customer_lname, customer_fname
  30. Number of files - 1
  31. Target Directory: /user/<YOUR_USER_ID>/solutions/solutions02/inactive_customers
  32. Target File Format: TEXT
  33. Target Delimiter: comma (“, ”)
  34. Compression: N/A */
  35.  
  36. /*
  37. spark-shell --master yarn \
  38. --conf spark.ui.port=12345 \
  39. --num-executors 1 \
  40. --executor-cores 1 \
  41. --executor-memory 2G
  42. */
  43.  
  44. val orders = sc.textFile("data/retail_db/orders")
  45. val customers = sc.textFile("data/retail_db/customers")
  46. val ordersRDD = orders.map(rec => (rec.split(",")(0), rec.split(",")(2)))
  47. val customersRDD = customers.map(rec => {
  48. val r = rec.split(",")
  49. (r(0),r(1),r(2))
  50. })
  51. val ordersDF = ordersRDD.toDF("order_id","order_customer_id")
  52. val customersDF = customersRDD.toDF("customer_id","customer_fname","customer_lname")
  53.  
  54. //sqlContext
  55. ordersDF.registerTempTable("orders")
  56. customersDF.registerTempTable("customers")
  57.  
  58. val sqlResult = sqlContext.sql("select customer_lname, customer_fname "+
  59. "from customers c left outer join orders o on "+
  60. "o.order_customer_id = c.customer_id "+
  61. "where o.order_customer_id is null "+
  62. "order by c.customer_lname, c.customer_fname")
  63.  
  64. sqlResult.coalesce(1).map(rec => rec.mkString(",")).saveAsTextFile("user/gontiv/solutions/solutions02/inactive_customers")
  65.  
  66. //CoreApi
  67. val ordersRDD = orders.map(rec => (rec.split(",")(2), rec.split(",")(0)))
  68. val customersRDD = customers.map(rec => {
  69. val r = rec.split(",")
  70. (r(0),(r(2),r(1)))
  71. })
  72.  
  73. val leftJoinData = customersRDD.leftOuterJoin(ordersRDD);
  74. val temp = leftJoinData.filter(rec => rec._2._2 == None).map(rec => rec._2).sortByKey()
  75. temp.map(rec => rec._1._1+" , "+rec._1._2).coalesce(1).
  76. saveAsTextFile("user/gontiv/solutions/solutions02_core/inactive_customers")
Add Comment
Please, Sign In to add comment