Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.dstream.DStream._
- import org.joda.time.DateTime
- import org.json4s.jackson.JsonMethods._
- import scala.util.Try
- case class Purchase(item_id: String, amount: BigDecimal, time: Long)
- case class Key(item_id: String, time: DateTime)
- case class Summary(item_id: String, time: DateTime, total: BigDecimal)
- object Example {
- implicit val formats = org.json4s.DefaultJsonFormats
- def extract(message: String): Option[(Key, BigDecimal)] = {
- for {
- parsed <- Try(parse(message)).toOption
- purchase <- parsed.extractOpt[Purchase]
- } yield {
- val datetime = new DateTime(purchase.time)
- val roundedTime = datetime.withMinuteOfHour(0).withSecondOfMinute(0).withMillisOfSecond(0)
- Key(purchase.item_id, roundedTime) -> purchase.amount
- }
- }
- def transformStream(stream: DStream[String]): DStream[Summary] = {
- stream
- .flatMap(extract)
- .reduceByKey(_ + _)
- .map { case (key, amount) =>
- Summary(key.item_id, key.time, amount)
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement