Advertisement
Guest User

Untitled

a guest
Dec 5th, 2016
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.44 KB | None | 0 0
  1. import java.sql.Timestamp
  2. import java.time.{LocalDateTime, ZoneId, ZonedDateTime}
  3. import com.cloudera.sparkts._
  4. import org.apache.spark.sql.SparkSession
  5. import org.apache.spark.sql.functions.udf
  6. import org.apache.spark.sql.functions._
  7.  
  8. /**
  9. * Created by josep2 on 12/5/16.
  10. */
  11.  
  12.  
  13. case class TimeSeries(id: String, week_beginning: String, sales: Double)
  14. object TimeSeriesBlog extends App {
  15.  
  16. val sparkSession = SparkSession.builder
  17. .master("local[4]")
  18. .appName("Panel Engine")
  19. .config("spark.driver.memory", "1g")
  20. .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  21. .getOrCreate()
  22.  
  23. import sparkSession.implicits._
  24.  
  25.  
  26.  
  27. var TimeSeriesData = sparkSession.sparkContext.textFile("./data.txt")
  28. .map(_.split(","))
  29. .map(row => TimeSeries(row(0), row(1), row(2).toDouble))
  30. .toDF()
  31.  
  32. def makeDate = udf((s: String) => Timestamp.from(ZonedDateTime.of(LocalDateTime.parse(s + "T00:00:00"), ZoneId.systemDefault()).toInstant))
  33.  
  34. TimeSeriesData = TimeSeriesData.withColumn("week_beginning", makeDate(TimeSeriesData("week_beginning")))
  35.  
  36.  
  37. val zone = ZoneId.systemDefault()
  38. val dtIndex = DateTimeIndex.uniformFromInterval(
  39. ZonedDateTime.of(LocalDateTime.parse("2016-01-04T00:00:00"), zone),
  40. ZonedDateTime.of(LocalDateTime.parse("2016-04-04T00:00:00"), zone),
  41. new DayFrequency(7))
  42.  
  43.  
  44. val dataRdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, TimeSeriesData,
  45. "week_beginning", "id", "sales")
  46.  
  47. sparkSession.sparkContext.stop()
  48. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement