Advertisement
Guest User

OuterJoinableRDD

a guest
Jul 9th, 2014
348
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 0.52 KB | None | 0 0
  1. case class OuterJoinableRDD[K: ClassManifest, V1: ClassManifest](rdd: RDD[(K, V1)]) extends RDDWrapper[(K, V1)] {
  2.   def outerJoin[V2](other: RDD[(K, V2)], numPartitions: Int): RDD[(K, (Option[V1], Option[V2]))] =
  3.     rdd.cogroup(other, new HashPartitioner(numPartitions)).flatMapValues {
  4.       case (v1s, Seq()) => v1s.iterator.map(v1 => (Some(v1), None))
  5.       case (Seq(), v2s) => v2s.iterator.map(v2 => (None, Some(v2)))
  6.       case (v1s, v2s) => v1s.iterator.flatMap(v1 => v2s.iterator.map(v2 => (Some(v1), Some(v2))))
  7.     }
  8. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement