Advertisement
Guest User

HiveRead.java

a guest
Jul 26th, 2017
371
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.83 KB | None | 0 0
  1. import org.apache.spark.sql.*;
  2.  
  3. /**
  4. * Created by saurab on 7/17/17.
  5. */
  6. public class HiveRead {
  7. public static void main(String[] args) {
  8. SparkSession spark = SparkSession
  9. .builder()
  10. .appName("Java Spark SQL basic example")
  11. .enableHiveSupport()
  12. .config("spark.sql.warehouse.dir", "hdfs://saurab:9000/user/hive/warehouse")
  13. .config("mapred.input.dir.recursive", true)
  14. .config("hive.mapred.supports.subdirectories", true)
  15. .config("hive.vectorized.execution.enabled", true)
  16. .master("local")
  17. .getOrCreate();
  18. // spark.sql("select bill_no bill, count(icode) icode from bigmart.o_sales group by bill_no").count();
  19.  
  20. getUniqueCarts(spark);
  21. getUniqueItemFromCart(spark);
  22. finalUniqueItem(spark);
  23. getFinalTable(spark);
  24. }
  25.  
  26. static void cacheTotalQty(SparkSession spark){
  27. try {
  28. spark.sql("select count(*) from bigmart.o_sales").createTempView("totalQty");
  29. spark.sqlContext().cacheTable("totalQty");
  30. } catch (AnalysisException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34.  
  35. static void getQtyContribution(SparkSession spark){
  36. cacheTotalQty(spark);
  37. try {
  38. spark.sql("select icode item ,count(icode) qty from bigmart.o_sales group by icode").createTempView("item");
  39. } catch (AnalysisException e) {
  40. e.printStackTrace();
  41. }
  42. try {
  43. spark.sql("select item,bround(qty/(select * from totalQty)*100,2) as qty_contrib from item").createTempView("QtyContribution");
  44. } catch (AnalysisException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48.  
  49. static void getUniqueCarts(SparkSession spark){
  50. try {
  51. spark.sql("select count(bill_no) from bigmart.o_sales ").createTempView("cart_count");
  52. } catch (AnalysisException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. //BM1103
  57. //BM8910
  58. static void getUniqueItemFromCart(SparkSession spark){
  59. try {
  60. spark.sql("select bill_no, first_value(icode) icode, count(icode) from bigmart.o_sales group by bill_no").createTempView("uniqueitems");
  61. } catch (AnalysisException e) {
  62. e.printStackTrace();
  63. }
  64. }
  65. static Dataset<Row> finalUniqueItem(SparkSession spark){
  66. try {
  67. spark.sql("select icode item, count(icode)/(select * from cart_count)*100 bp from uniqueitems group by icode").createTempView("final123");
  68. } catch (AnalysisException e) {
  69. e.printStackTrace();
  70. }
  71. return spark.sql("select * from final123");
  72. }
  73. static Dataset<Row> getRevenueContribution(SparkSession spark){
  74. try {
  75. spark.sql("select icode as item,cast(sum(total_amount) as long) mrp from bigmart.o_sales group by icode").createTempView("item1");
  76. } catch (AnalysisException e) {
  77. e.printStackTrace();
  78. }
  79. try {
  80. spark.sql("select item,bround(mrp/(select sum(total_amount) from bigmart.o_sales)*100,2) as rev_contrib from item1").createTempView("revenue_contrib");
  81. } catch (AnalysisException e) {
  82. e.printStackTrace();
  83. }
  84. return spark.sql("select * from revenue_contrib");
  85. }
  86. static void getFinalTable(SparkSession spark){
  87. getQtyContribution(spark);
  88. try{
  89. spark.sql("select * from QtyContribution").join(getRevenueContribution(spark), "item").join(finalUniqueItem(spark),"item").createTempView("hola");
  90. }catch(AnalysisException e){
  91. e.printStackTrace();
  92. }
  93. spark.sql("select item,qty_contrib*0.6,rev_contrib*0.2,bp*0.2 from hola").show();
  94. }
  95. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement