Guest User

Untitled

a guest
Jun 20th, 2018
70
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.82 KB | None | 0 0
  1. case class SiteId(val v: String) extends AnyVal {
  2. override def toString = v.toString
  3. }
  4.  
  5. case class ItemId(val v: String) extends AnyVal {
  6. override def toString = v.toString
  7. }
  8.  
  9. case class Item(siteId: SiteId, itemId: ItemId, price: Option[Double])
  10.  
  11.  
  12. def getFirstOccurrence(archive: RDD[(Timestamp, Item)], availableNow: RDD[(Item, Boolean)])(implicit parallel: Int) = {
  13. val occurance = archive
  14. .map { case (existAt, item) => ((item.siteId, item.itemId), existAt) }
  15. .reduceByKey{ case (tsA, tsB) => if (tsA.toMillis < tsB.toMillis) tsA else tsB}
  16. .join(availableNow.map{case (item, isAvailable) => ((item.siteId, item.itemId), isAvailable)})
  17. .filter{case ((siteId, itemId), (_, isAvailable)) => isAvailable}
  18. .map{ case ((siteId, itemId), (createdAt, _)) => (siteId, itemId, createdAt)}
  19. }
Add Comment
Please, Sign In to add comment