Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val newEdges = edges.withPartitionsRDD(edges.map { e =>
- val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
- (part, (e.srcId, e.dstId, e.attr))
- }
- .partitionBy(new HashPartitioner(numPartitions))
- .mapPartitionsWithIndex( { (pid, iter) =>
- val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
- iter.foreach { message =>
- val data = message._2
- builder.add(data._1, data._2, data._3)
- }
- val edgePartition = builder.toEdgePartition
- Iterator((pid, edgePartition))
- }, preservesPartitioning = true)).cache()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement