Guest User

Untitled

a guest
Sep 8th, 2018
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.30 KB | None | 0 0
  1. sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  2. --username retail_dba \
  3. --password cloudera \
  4. --table orders \
  5. --target-dir /user/cloudera/problem1/orders \
  6. --as-avrodatafile \
  7. --compress \
  8. --compression-codec org.apache.hadoop.io.compress.SnappyCodec
  9.  
  10. sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  11. --username retail_dba \
  12. --password cloudera \
  13. --table order_items \
  14. --compress \
  15. --compression-codec org.apache.hadoop.io.compress.SnappyCodec \
  16. --target-dir /user/cloudera/problem1/order-items \
  17. --as-avrodatafile
  18.  
  19. from pyspark import SparkContext,SparkConf,Row,HiveContext
  20.  
  21. conf=SparkConf().setAppName("problem-one").setMaster("yarn-client")
  22. sc=SparkContext(conf=conf)
  23. sqlContext=HiveContext(sc)
  24.  
  25.  
  26. orders=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
  27.  
  28. orderItems=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
  29.  
  30. orders -orderid orderdate order customer id orderstatus
  31. orderItems - order_item_id orderitemorderid orderitemproductid orderquanty orderitemsubtotal orderitemproductprice
  32.  
  33. orders.registerTempTable("orders")
  34. orderItems.registerTempTable("order_items")
  35.  
  36. sqlResult = sqlContext.sql("select to_date(from_unixtime(cast(o.order_date/1000 as bigint))) as order_formatted_date,o.order_status,sum(oi.order_item_subtotal) as total_amount,count(*) as total_orders from orders o join order_items oi on o.order_id=oi.order_item_order_id group by to_date(from_unixtime(cast(o.order_date/1000 as bigint))),o.order_status order by order_formatted_date desc,o.order_status")
  37.  
  38. sqlContext.setConf("spark.sql.shuffle.partitions","4")
  39. sqlContext.setConf("spark.sql.parquet.compression.codec","gzip")
  40. sqlResult.write.parquet("/user/cloudera/problem1/parquet-gzip")
  41.  
  42. sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
  43. sqlResult.write.parquet("/user/cloudera/problem1/parquet-snappy")
  44.  
  45. sqlResult.map(lambda rec: (str(rec[0])+","+str(rec[1])+","+str(rec[2])+","+str(rec[3]))).saveAsTextFile(/user/cloudera/problem1/text)
  46.  
  47.  
  48. ------------------------------------------------------------------
  49. ordersData=sc.textFile("/user/cloudera/table/orders")
  50. orderItemsData=sc.textFile("/user/cloudera/table/order_items")
  51.  
  52. orders=ordersData.map(lambda rec: (int(rec.split(",")[0]),rec))
  53. orderItems=orderItemsData.map(lambda rec:(int(rec.split(",")[1]),rec))
  54.  
  55. orderJoinOrderitems=orders.join(orderItems)
  56.  
  57. joinMap = orderJoinOrderItems.map(lambda rec:((rec[1][0].split(",")[1][:10],rec[1][0].split(",")[3]),float(rec[1][1].split(",")[4])))
  58.  
  59. revenueAndCount=joinMap.aggregateByKey((0.0,0),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))
  60.  
  61.  
  62. dataRec=revenueAndCount.map(lambda rec:(rec[0][0]+","+rec[0][1]+","+str(rec[1][0])+","+str(rec[1][1]))).saveAsTextFile("/user/cloudera/arun/ansOnewithRDD")
  63.  
  64. -----------------------------------------------------------------------
  65.  
  66.  
  67. create table retail_db.result(order_date varchar(255) not null,
  68. order_status varchar(255) not null, total_orders int,total_amount numeric,
  69. constraint pk_order_result primary key(order_date,order_status))
  70.  
  71. sqoop export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  72. --username retail_dba \
  73. --password cloudera \
  74. --table result \
  75. --export-dir "/user/cloudera/problem1/result4a-csv" \
  76. --columns "order_date,order_status,total_amount,total_orders"
Add Comment
Please, Sign In to add comment