Advertisement
Guest User

Untitled

a guest
Jul 27th, 2017
62
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.95 KB | None | 0 0
  1. |City| Month |Sale|
  2. +----+----------- +----- +
  3. | c1| JAN-2017| 49 |
  4. | c1| FEB-2017| 46 |
  5. | c1| MAR-2017| 83 |
  6. | c2| JAN-2017| 59 |
  7. | c2| MAY-2017| 60 |
  8. | c2| JUN-2017| 49 |
  9. | c2| JUL-2017| 73 |
  10. +----+-----+----+-------
  11.  
  12. |City| Month |Sale |previous_sale|
  13. +----+-----+-------+-------------+--------
  14. | c1| JAN-2017| 49| NULL |
  15. | c1| FEB-2017| 46| 49 |
  16. | c1| MAR-2017| 83| 46 |
  17. | c2| JAN-2017| 59| NULL |
  18. | c2| MAY-2017| 60| 59 |
  19. | c2| JUN-2017| 49| 60 |
  20. | c2| JUL-2017| 73| 49 |
  21. +----+-----+----+-------------+-----------
  22.  
  23. import spark.implicits._
  24. val df = spark.sparkContext.parallelize(Seq(
  25. ("c1", "JAN-2017", 49),
  26. ("c1", "FEB-2017", 46),
  27. ("c1", "MAR-2017", 83),
  28. ("c2", "JAN-2017", 59),
  29. ("c2", "MAY-2017", 60),
  30. ("c2", "JUN-2017", 49),
  31. ("c2", "JUL-2017", 73)
  32. )).toDF("city", "month", "sales")
  33.  
  34. val window = Window.partitionBy("city").orderBy("month")
  35.  
  36. df.withColumn("previous_sale", lag($"sales", 1, null).over(window)).show
  37.  
  38. +----+--------+-----+----+
  39. |city| month|sales| previous_sale|
  40. +----+--------+-----+----+
  41. | c1|FEB-2017| 46|null|
  42. | c1|JAN-2017| 49| 46|
  43. | c1|MAR-2017| 83| 49|
  44. | c2|JAN-2017| 59|null|
  45. | c2|JUL-2017| 73| 59|
  46. | c2|JUN-2017| 49| 73|
  47. | c2|MAY-2017| 60| 49|
  48. +----+--------+-----+----+
  49.  
  50. val fullDate = udf((value :String )=>
  51. {
  52. val months = List("JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC")
  53. val splited = value.split("-")
  54. new Date(splited(1).toInt, months.indexOf(splited(0)) + 1, 1)
  55. })
  56.  
  57. df.withColumn("month", fullDate($"month")).show()
  58.  
  59. package com.incedo.pharma
  60. import org.apache.spark.SparkConf
  61. import org.apache.spark.SparkContext
  62. import org.apache.spark.sql.SQLContext
  63. import org.apache.spark.SparkContext._
  64. import org.apache.spark.sql.functions._
  65. import org.apache.spark.sql.functions.unix_timestamp
  66. import org.apache.spark.sql.functions.udf
  67. import org.apache.spark.sql.SparkSession
  68. //import org.apache.spark.sql.expressions.Window
  69. import java.sql.Date
  70.  
  71. class SecondTask {
  72. def previousMonthSale(){
  73. val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
  74. val sc = new SparkContext(conf)
  75. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  76. val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
  77. import ss.sqlContext.implicits._
  78. val df = sqlContext.read.format("com.databricks.spark.csv")
  79. .option("header", "true")
  80. .option("inferSchema", "true")
  81. .option("delimiter", "|")
  82. .load("taskTwoData.csv")
  83. val df1 = df.withColumn("Timestamp", unix_timestamp(df("Date"), "MM/dd/yyyy").cast("timestamp"))
  84. val df2 = df1.withColumn("Month",month(df1("Timestamp")))
  85. val df3 = df2.groupBy("City", "Month").agg(sum(col("Sale")).alias("Current_Month_Total_Sale")).orderBy("City","Month")
  86. val df4 = df3.withColumn("pre_month",df3("Month")-1)
  87. val df5 = df4.alias("a").join(df3.alias("b"),$"a.pre_month" === $"b.Month" && $"a.City" === $"b.City","left_outer")
  88. .select($"a.City",$"a.Month",$"a.Current_Month_Total_Sale",($"b.Current_Month_Total_Sale")
  89. .alias("Previous_Month_Total_Sale")).na.fill(0,Seq("Previous_Month_Total_Sale"))
  90.  
  91. val df6 = df5.withColumn("Percent_Sale_Change",round(((df5("Current_Month_Total_Sale") - df5("Previous_Month_Total_Sale"))/df5("Current_Month_Total_Sale"))*100,2))
  92. val df7 = df6.groupBy("City").max("Current_Month_Total_Sale").alias("Max_Sale").orderBy("City")
  93. //df7.show()
  94. val df8 = df6.join(df7, Seq("City"))
  95. val df9 = df8.withColumn("Percent_Sale_By_Max_Sale", round(df8("Current_Month_Total_Sale")/df8("max(Current_Month_Total_Sale)"),2))
  96. .drop("max(Current_Month_Total_Sale)")
  97. df9.toDF().show()
  98. }
  99. }
  100.  
  101. object taskTwo {
  102. def main(arr: Array[String]) {
  103. new SecondTask().previousMonthSale()
  104. }`enter code here`
  105. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement