Advertisement
Guest User

Untitled

a guest
Jul 3rd, 2015
201
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.04 KB | None | 0 0
  1. import org.apache.spark.streaming.dstream.DStream
  2. import org.apache.spark.streaming.dstream.DStream._
  3. import org.joda.time.DateTime
  4. import org.json4s.jackson.JsonMethods._
  5. import scala.util.Try
  6.  
  7. case class Purchase(item_id: String, amount: BigDecimal, time: Long)
  8. case class Key(item_id: String, time: DateTime)
  9. case class Summary(item_id: String, time: DateTime, total: BigDecimal)
  10.  
  11. object Example {
  12. implicit val formats = org.json4s.DefaultJsonFormats
  13.  
  14. def extract(message: String): Option[(Key, BigDecimal)] = {
  15. for {
  16. parsed <- Try(parse(message)).toOption
  17. purchase <- parsed.extractOpt[Purchase]
  18. } yield {
  19. val datetime = new DateTime(purchase.time)
  20. val roundedTime = datetime.withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0)
  21. Key(purchase.item_id, roundedTime) -> purchase.amount
  22. }
  23. }
  24.  
  25. def transformStream(stream: DStream[String]): DStream[Summary] = {
  26. stream
  27. .flatMap(extract)
  28. .reduceByKey(_ + _)
  29. .map { case (key, amount) =>
  30. Summary(key.item_id, key.time, amount)
  31. }
  32. }
  33. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement