Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- case class SiteId(val v: String) extends AnyVal {
- override def toString = v.toString
- }
- case class ItemId(val v: String) extends AnyVal {
- override def toString = v.toString
- }
- case class Item(siteId: SiteId, itemId: ItemId, price: Option[Double])
- def getFirstOccurrence(archive: RDD[(Timestamp, Item)], availableNow: RDD[(Item, Boolean)])(implicit parallel: Int) = {
- val occurance = archive
- .map { case (existAt, item) => ((item.siteId, item.itemId), existAt) }
- .reduceByKey{ case (tsA, tsB) => if (tsA.toMillis < tsB.toMillis) tsA else tsB}
- .join(availableNow.map{case (item, isAvailable) => ((item.siteId, item.itemId), isAvailable)})
- .filter{case ((siteId, itemId), (_, isAvailable)) => isAvailable}
- .map{ case ((siteId, itemId), (createdAt, _)) => (siteId, itemId, createdAt)}
- }
Add Comment
Please, Sign In to add comment