Advertisement
Guest User

Untitled

a guest
Aug 7th, 2014
197
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 0.63 KB | None | 0 0
  1.  val newEdges = edges.withPartitionsRDD(edges.map { e =>
  2.       val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
  3.       (part, (e.srcId, e.dstId, e.attr))
  4.     }
  5.       .partitionBy(new HashPartitioner(numPartitions))
  6.       .mapPartitionsWithIndex( { (pid, iter) =>
  7.         val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
  8.         iter.foreach { message =>
  9.           val data = message._2
  10.           builder.add(data._1, data._2, data._3)
  11.         }
  12.         val edgePartition = builder.toEdgePartition
  13.         Iterator((pid, edgePartition))
  14.       }, preservesPartitioning = true)).cache()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement