Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- case class OuterJoinableRDD[K: ClassManifest, V1: ClassManifest](rdd: RDD[(K, V1)]) extends RDDWrapper[(K, V1)] {
- def outerJoin[V2](other: RDD[(K, V2)], numPartitions: Int): RDD[(K, (Option[V1], Option[V2]))] =
- rdd.cogroup(other, new HashPartitioner(numPartitions)).flatMapValues {
- case (v1s, Seq()) => v1s.iterator.map(v1 => (Some(v1), None))
- case (Seq(), v2s) => v2s.iterator.map(v2 => (None, Some(v2)))
- case (v1s, v2s) => v1s.iterator.flatMap(v1 => v2s.iterator.map(v2 => (Some(v1), Some(v2))))
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement