Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- sqoop import \
- --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
- --username retail_user \
- --password itversity \
- --table orders \
- --target-dir data/retail_db/orders \
- --as-textfile \
- --num-mappers 1
- */
- /*
- sqoop import \
- --connect jdbc:mysql://ms.itversity.com:3306/retail_db \
- --username retail_user \
- --password itversity \
- --table customers \
- --target-dir data/retail_db/customers \
- --as-textfile \
- --num-mappers 1
- */
- /* Data is available in local file system /data/retail_db
- Source directories: /data/retail_db/orders and /data/retail_db/customers
- Source delimiter: comma (“,”)
- Source Columns - orders - order_id, order_date, order_customer_id, order_status
- Source Columns - customers - customer_id, customer_fname, customer_lname and many more
- Get the customers who have not placed any orders, sorted by customer_lname and then customer_fname
- Target Columns: customer_lname, customer_fname
- Number of files - 1
- Target Directory: /user/<YOUR_USER_ID>/solutions/solutions02/inactive_customers
- Target File Format: TEXT
- Target Delimiter: comma (“, ”)
- Compression: N/A */
- /*
- spark-shell --master yarn \
- --conf spark.ui.port=12345 \
- --num-executors 1 \
- --executor-cores 1 \
- --executor-memory 2G
- */
- val orders = sc.textFile("data/retail_db/orders")
- val customers = sc.textFile("data/retail_db/customers")
- val ordersRDD = orders.map(rec => (rec.split(",")(0), rec.split(",")(2)))
- val customersRDD = customers.map(rec => {
- val r = rec.split(",")
- (r(0),r(1),r(2))
- })
- val ordersDF = ordersRDD.toDF("order_id","order_customer_id")
- val customersDF = customersRDD.toDF("customer_id","customer_fname","customer_lname")
- //sqlContext
- ordersDF.registerTempTable("orders")
- customersDF.registerTempTable("customers")
- val sqlResult = sqlContext.sql("select customer_lname, customer_fname "+
- "from customers c left outer join orders o on "+
- "o.order_customer_id = c.customer_id "+
- "where o.order_customer_id is null "+
- "order by c.customer_lname, c.customer_fname")
- sqlResult.coalesce(1).map(rec => rec.mkString(",")).saveAsTextFile("user/gontiv/solutions/solutions02/inactive_customers")
- //CoreApi
- val ordersRDD = orders.map(rec => (rec.split(",")(2), rec.split(",")(0)))
- val customersRDD = customers.map(rec => {
- val r = rec.split(",")
- (r(0),(r(2),r(1)))
- })
- val leftJoinData = customersRDD.leftOuterJoin(ordersRDD);
- val temp = leftJoinData.filter(rec => rec._2._2 == None).map(rec => rec._2).sortByKey()
- temp.map(rec => rec._1._1+" , "+rec._1._2).coalesce(1).
- saveAsTextFile("user/gontiv/solutions/solutions02_core/inactive_customers")
Add Comment
Please, Sign In to add comment