Guest User

Untitled

a guest
Nov 13th, 2018
134
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.40 KB | None | 0 0
  1. sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table orders --as-avrodatafile --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec --target-dir /user/cloudera/problem1/orders
  2.  
  3. sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table order_items --as-avrodatafile --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec --target-dir /user/cloudera/problem1/order-items
  4.  
  5. """
  6. from pyspark import SparkContext,SparkConf,row,HiveContext
  7.  
  8. conf=SparkConf().setAppName("problem1").setMaster("yarn-client")
  9. sc=SparkContext(conf=conf)
  10. sqlContext=HiveContext(sc)
  11.  
  12. ordersDF = sqlContext.read.avro("/user/cloudera/problem1/orders","com.databricks.spark.avro")
  13. orderItemsDF = sqlContext.read.avro("/user/cloudera/problem1/order-items","com.databricks.spark.avro")
  14.  
  15.  
  16.  
  17.  
  18. ordersDF = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
  19. orderItemsDF = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
  20.  
  21.  
  22. ordersJoinorderItems=ordersDF.join(orderItemsDF, ordersDF.order_id == orderItemsDF.order_item_order_id)
  23.  
  24. ordersMapOrderItems=ordersJoinorderItems.map(lambda rec: ((to_date(from_unixtime(rec.order_date/1000)),rec.order_status),(rec.order_id,1))
  25.  
  26. from pyspark.sql.functions import unix_timestamp
  27.  
  28. """
  29.  
  30. ordersDF=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
  31. orderItemsDF=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
  32. ordersJoinOrderitems=ordersDF.join(orderItemsDF,ordersDF.order_id == orderItemsDF.order_item_order_id)
  33. ordersMapOrderItems=ordersJoinOrderitems.map(lambda rec: ((rec.order_date,rec.order_status),(rec.order_id,1)))
  34.  
  35. ordersMapOrderItems=ordersJoinOrderitems.map(lambda rec: ((from_unixtime(rec.order_date/1000),rec.order_status),(rec.order_id,1)))
  36.  
  37. from pyspark.sql.functions import unix_timestamp,from_unixtime,to_date
  38.  
  39.  
  40. ordersJoinOrderitems.registerTempTable("ordersJoin")
  41. sqlResult=sqlContext.sql("select to_date(from_unixtime(cast(order_date/1000 as bigint))) order_formatted,order_status, sum(order_item_subtotal) total_amount,count(order_id) total_orders from ordersJoin group by to_date(from_unixtime(cast(order_date/1000 as bigint))),order_status order by order_formatted desc, order_status")
  42.  
  43. sqlContext.setConf("spark.sql.parquet.compression.codec","gzip")
  44. sqlContext.setConf("spark.sql.shuffle.partition","4")
  45.  
  46. sqlResult.write.parquet("/user/cloudera/problem1/result4b-gzip")
  47.  
  48. sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
  49. sqlResult.write.parquet("/user/cloudera/problem1/result4b-snappy")
  50.  
  51.  
  52. sqlResult.map(lambda rec: (str(rec.order_formatted)+","+str(rec.order_status)+","+str(rec.total_amount)+","+str(rec.total_orders))).saveAsTextFile("/user/cloudera/problem1/result4b-csv")
  53.  
  54.  
  55. login to MYSQL using below : mysql -h localhost -u retail_dba -p
  56.  
  57. create table retail_db.result(order_date varchar(255) not null,order_status varchar(255) not null, total_amount numeric, total_orders int, constraint pk_order_result primary key (order_date,order_status));
  58.  
  59.  
  60. sqoop-export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table result --export-dir /user/cloudera/problem1/result4b-csv --columns "order_date, order_status,total_amount, total_orders"
Add Comment
Please, Sign In to add comment