Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table orders --as-avrodatafile --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec --target-dir /user/cloudera/problem1/orders
- sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table order_items --as-avrodatafile --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec --target-dir /user/cloudera/problem1/order-items
- """
- from pyspark import SparkContext,SparkConf,row,HiveContext
- conf=SparkConf().setAppName("problem1").setMaster("yarn-client")
- sc=SparkContext(conf=conf)
- sqlContext=HiveContext(sc)
- ordersDF = sqlContext.read.avro("/user/cloudera/problem1/orders","com.databricks.spark.avro")
- orderItemsDF = sqlContext.read.avro("/user/cloudera/problem1/order-items","com.databricks.spark.avro")
- ordersDF = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
- orderItemsDF = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
- ordersJoinorderItems=ordersDF.join(orderItemsDF, ordersDF.order_id == orderItemsDF.order_item_order_id)
- ordersMapOrderItems=ordersJoinorderItems.map(lambda rec: ((to_date(from_unixtime(rec.order_date/1000)),rec.order_status),(rec.order_id,1))
- from pyspark.sql.functions import unix_timestamp
- """
- ordersDF=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
- orderItemsDF=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
- ordersJoinOrderitems=ordersDF.join(orderItemsDF,ordersDF.order_id == orderItemsDF.order_item_order_id)
- ordersMapOrderItems=ordersJoinOrderitems.map(lambda rec: ((rec.order_date,rec.order_status),(rec.order_id,1)))
- ordersMapOrderItems=ordersJoinOrderitems.map(lambda rec: ((from_unixtime(rec.order_date/1000),rec.order_status),(rec.order_id,1)))
- from pyspark.sql.functions import unix_timestamp,from_unixtime,to_date
- ordersJoinOrderitems.registerTempTable("ordersJoin")
- sqlResult=sqlContext.sql("select to_date(from_unixtime(cast(order_date/1000 as bigint))) order_formatted,order_status, sum(order_item_subtotal) total_amount,count(order_id) total_orders from ordersJoin group by to_date(from_unixtime(cast(order_date/1000 as bigint))),order_status order by order_formatted desc, order_status")
- sqlContext.setConf("spark.sql.parquet.compression.codec","gzip")
- sqlContext.setConf("spark.sql.shuffle.partition","4")
- sqlResult.write.parquet("/user/cloudera/problem1/result4b-gzip")
- sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
- sqlResult.write.parquet("/user/cloudera/problem1/result4b-snappy")
- sqlResult.map(lambda rec: (str(rec.order_formatted)+","+str(rec.order_status)+","+str(rec.total_amount)+","+str(rec.total_orders))).saveAsTextFile("/user/cloudera/problem1/result4b-csv")
- login to MYSQL using below : mysql -h localhost -u retail_dba -p
- create table retail_db.result(order_date varchar(255) not null,order_status varchar(255) not null, total_amount numeric, total_orders int, constraint pk_order_result primary key (order_date,order_status));
- sqoop-export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --table result --export-dir /user/cloudera/problem1/result4b-csv --columns "order_date, order_status,total_amount, total_orders"
Add Comment
Please, Sign In to add comment