Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
- --username retail_dba \
- --password cloudera \
- --table orders \
- --target-dir /user/cloudera/problem1/orders \
- --as-avrodatafile \
- --compress \
- --compression-codec org.apache.hadoop.io.compress.SnappyCodec
- sqoop-import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
- --username retail_dba \
- --password cloudera \
- --table order_items \
- --compress \
- --compression-codec org.apache.hadoop.io.compress.SnappyCodec \
- --target-dir /user/cloudera/problem1/order-items \
- --as-avrodatafile
- from pyspark import SparkContext,SparkConf,Row,HiveContext
- conf=SparkConf().setAppName("problem-one").setMaster("yarn-client")
- sc=SparkContext(conf=conf)
- sqlContext=HiveContext(sc)
- orders=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
- orderItems=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
- orders -orderid orderdate order customer id orderstatus
- orderItems - order_item_id orderitemorderid orderitemproductid orderquanty orderitemsubtotal orderitemproductprice
- orders.registerTempTable("orders")
- orderItems.registerTempTable("order_items")
- sqlResult = sqlContext.sql("select to_date(from_unixtime(cast(o.order_date/1000 as bigint))) as order_formatted_date,o.order_status,sum(oi.order_item_subtotal) as total_amount,count(*) as total_orders from orders o join order_items oi on o.order_id=oi.order_item_order_id group by to_date(from_unixtime(cast(o.order_date/1000 as bigint))),o.order_status order by order_formatted_date desc,o.order_status")
- sqlContext.setConf("spark.sql.shuffle.partitions","4")
- sqlContext.setConf("spark.sql.parquet.compression.codec","gzip")
- sqlResult.write.parquet("/user/cloudera/problem1/parquet-gzip")
- sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
- sqlResult.write.parquet("/user/cloudera/problem1/parquet-snappy")
- sqlResult.map(lambda rec: (str(rec[0])+","+str(rec[1])+","+str(rec[2])+","+str(rec[3]))).saveAsTextFile(/user/cloudera/problem1/text)
- ------------------------------------------------------------------
- ordersData=sc.textFile("/user/cloudera/table/orders")
- orderItemsData=sc.textFile("/user/cloudera/table/order_items")
- orders=ordersData.map(lambda rec: (int(rec.split(",")[0]),rec))
- orderItems=orderItemsData.map(lambda rec:(int(rec.split(",")[1]),rec))
- orderJoinOrderitems=orders.join(orderItems)
- joinMap = orderJoinOrderItems.map(lambda rec:((rec[1][0].split(",")[1][:10],rec[1][0].split(",")[3]),float(rec[1][1].split(",")[4])))
- revenueAndCount=joinMap.aggregateByKey((0.0,0),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1]))
- dataRec=revenueAndCount.map(lambda rec:(rec[0][0]+","+rec[0][1]+","+str(rec[1][0])+","+str(rec[1][1]))).saveAsTextFile("/user/cloudera/arun/ansOnewithRDD")
- -----------------------------------------------------------------------
- create table retail_db.result(order_date varchar(255) not null,
- order_status varchar(255) not null, total_orders int,total_amount numeric,
- constraint pk_order_result primary key(order_date,order_status))
- sqoop export --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
- --username retail_dba \
- --password cloudera \
- --table result \
- --export-dir "/user/cloudera/problem1/result4a-csv" \
- --columns "order_date,order_status,total_amount,total_orders"
Add Comment
Please, Sign In to add comment