Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.sql.*;
- /**
- * Created by saurab on 7/17/17.
- */
- public class HiveRead {
- public static void main(String[] args) {
- SparkSession spark = SparkSession
- .builder()
- .appName("Java Spark SQL basic example")
- .enableHiveSupport()
- .config("spark.sql.warehouse.dir", "hdfs://saurab:9000/user/hive/warehouse")
- .config("mapred.input.dir.recursive", true)
- .config("hive.mapred.supports.subdirectories", true)
- .config("hive.vectorized.execution.enabled", true)
- .master("local")
- .getOrCreate();
- // spark.sql("select bill_no bill, count(icode) icode from bigmart.o_sales group by bill_no").count();
- getUniqueCarts(spark);
- getUniqueItemFromCart(spark);
- finalUniqueItem(spark);
- getFinalTable(spark);
- }
- static void cacheTotalQty(SparkSession spark){
- try {
- spark.sql("select count(*) from bigmart.o_sales").createTempView("totalQty");
- spark.sqlContext().cacheTable("totalQty");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- }
- static void getQtyContribution(SparkSession spark){
- cacheTotalQty(spark);
- try {
- spark.sql("select icode item ,count(icode) qty from bigmart.o_sales group by icode").createTempView("item");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- try {
- spark.sql("select item,bround(qty/(select * from totalQty)*100,2) as qty_contrib from item").createTempView("QtyContribution");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- }
- static void getUniqueCarts(SparkSession spark){
- try {
- spark.sql("select count(bill_no) from bigmart.o_sales ").createTempView("cart_count");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- }
- //BM1103
- //BM8910
- static void getUniqueItemFromCart(SparkSession spark){
- try {
- spark.sql("select bill_no, first_value(icode) icode, count(icode) from bigmart.o_sales group by bill_no").createTempView("uniqueitems");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- }
- static Dataset<Row> finalUniqueItem(SparkSession spark){
- try {
- spark.sql("select icode item, count(icode)/(select * from cart_count)*100 bp from uniqueitems group by icode").createTempView("final123");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- return spark.sql("select * from final123");
- }
- static Dataset<Row> getRevenueContribution(SparkSession spark){
- try {
- spark.sql("select icode as item,cast(sum(total_amount) as long) mrp from bigmart.o_sales group by icode").createTempView("item1");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- try {
- spark.sql("select item,bround(mrp/(select sum(total_amount) from bigmart.o_sales)*100,2) as rev_contrib from item1").createTempView("revenue_contrib");
- } catch (AnalysisException e) {
- e.printStackTrace();
- }
- return spark.sql("select * from revenue_contrib");
- }
- static void getFinalTable(SparkSession spark){
- getQtyContribution(spark);
- try{
- spark.sql("select * from QtyContribution").join(getRevenueContribution(spark), "item").join(finalUniqueItem(spark),"item").createTempView("hola");
- }catch(AnalysisException e){
- e.printStackTrace();
- }
- spark.sql("select item,qty_contrib*0.6,rev_contrib*0.2,bp*0.2 from hola").show();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement