Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- |City| Month |Sale|
- +----+----------- +----- +
- | c1| JAN-2017| 49 |
- | c1| FEB-2017| 46 |
- | c1| MAR-2017| 83 |
- | c2| JAN-2017| 59 |
- | c2| MAY-2017| 60 |
- | c2| JUN-2017| 49 |
- | c2| JUL-2017| 73 |
- +----+-----+----+-------
- |City| Month |Sale |previous_sale|
- +----+-----+-------+-------------+--------
- | c1| JAN-2017| 49| NULL |
- | c1| FEB-2017| 46| 49 |
- | c1| MAR-2017| 83| 46 |
- | c2| JAN-2017| 59| NULL |
- | c2| MAY-2017| 60| 59 |
- | c2| JUN-2017| 49| 60 |
- | c2| JUL-2017| 73| 49 |
- +----+-----+----+-------------+-----------
- import spark.implicits._
- val df = spark.sparkContext.parallelize(Seq(
- ("c1", "JAN-2017", 49),
- ("c1", "FEB-2017", 46),
- ("c1", "MAR-2017", 83),
- ("c2", "JAN-2017", 59),
- ("c2", "MAY-2017", 60),
- ("c2", "JUN-2017", 49),
- ("c2", "JUL-2017", 73)
- )).toDF("city", "month", "sales")
- val window = Window.partitionBy("city").orderBy("month")
- df.withColumn("previous_sale", lag($"sales", 1, null).over(window)).show
- +----+--------+-----+----+
- |city| month|sales| previous_sale|
- +----+--------+-----+----+
- | c1|FEB-2017| 46|null|
- | c1|JAN-2017| 49| 46|
- | c1|MAR-2017| 83| 49|
- | c2|JAN-2017| 59|null|
- | c2|JUL-2017| 73| 59|
- | c2|JUN-2017| 49| 73|
- | c2|MAY-2017| 60| 49|
- +----+--------+-----+----+
- val fullDate = udf((value :String )=>
- {
- val months = List("JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC")
- val splited = value.split("-")
- new Date(splited(1).toInt, months.indexOf(splited(0)) + 1, 1)
- })
- df.withColumn("month", fullDate($"month")).show()
- package com.incedo.pharma
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.sql.SQLContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.functions.unix_timestamp
- import org.apache.spark.sql.functions.udf
- import org.apache.spark.sql.SparkSession
- //import org.apache.spark.sql.expressions.Window
- import java.sql.Date
- class SecondTask {
- def previousMonthSale(){
- val conf = new SparkConf().setAppName("Excel-read-write").setMaster("local")
- val sc = new SparkContext(conf)
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- val ss = SparkSession.builder().master("local").appName("Excel-read-write").getOrCreate()
- import ss.sqlContext.implicits._
- val df = sqlContext.read.format("com.databricks.spark.csv")
- .option("header", "true")
- .option("inferSchema", "true")
- .option("delimiter", "|")
- .load("taskTwoData.csv")
- val df1 = df.withColumn("Timestamp", unix_timestamp(df("Date"), "MM/dd/yyyy").cast("timestamp"))
- val df2 = df1.withColumn("Month",month(df1("Timestamp")))
- val df3 = df2.groupBy("City", "Month").agg(sum(col("Sale")).alias("Current_Month_Total_Sale")).orderBy("City","Month")
- val df4 = df3.withColumn("pre_month",df3("Month")-1)
- val df5 = df4.alias("a").join(df3.alias("b"),$"a.pre_month" === $"b.Month" && $"a.City" === $"b.City","left_outer")
- .select($"a.City",$"a.Month",$"a.Current_Month_Total_Sale",($"b.Current_Month_Total_Sale")
- .alias("Previous_Month_Total_Sale")).na.fill(0,Seq("Previous_Month_Total_Sale"))
- val df6 = df5.withColumn("Percent_Sale_Change",round(((df5("Current_Month_Total_Sale") - df5("Previous_Month_Total_Sale"))/df5("Current_Month_Total_Sale"))*100,2))
- val df7 = df6.groupBy("City").max("Current_Month_Total_Sale").alias("Max_Sale").orderBy("City")
- //df7.show()
- val df8 = df6.join(df7, Seq("City"))
- val df9 = df8.withColumn("Percent_Sale_By_Max_Sale", round(df8("Current_Month_Total_Sale")/df8("max(Current_Month_Total_Sale)"),2))
- .drop("max(Current_Month_Total_Sale)")
- df9.toDF().show()
- }
- }
- object taskTwo {
- def main(arr: Array[String]) {
- new SecondTask().previousMonthSale()
- }`enter code here`
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement