Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.spark.{SparkContext, SparkConf}
- import org.apache.spark.graphx._
- import org.apache.spark.rdd.RDD
- object TopPaths {
- def main(args: Array[String]) {
- val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("Hits"))
- val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
- (1l, "G"), (2l, "S"), (3l, "C"),
- (4l, "G"),
- (5l, "D"), (6l, "D"), (7l, "G"), (8l, "C"),
- (9l, "S"), (10l, "C"), (11l, "D"),
- (12l, "G"), (13l, "S"),
- (14l, "C")))
- val edges: RDD[Edge[String]] = sc.parallelize(Array(
- Edge(1l, 2l, ""), Edge(2l, 3l, ""),
- Edge(5l, 6l, ""), Edge(6l, 7l, ""), Edge(7l, 8l, ""),
- Edge(9l, 10l, ""), Edge(10l, 11l, ""),
- Edge(12l, 13l, "")
- ))
- val graph = Graph(vertices, edges).reverse.
- mapVertices((id, pageType) => (pageType, List(pageType)))
- // val graph = Graph(vertices, edges).
- // mapVertices((id, pageType) => (pageType, List(pageType)))
- val pathGraph = Pregel(graph, List.empty[String],
- maxIterations = 4, activeDirection = EdgeDirection.Out)(
- vertexProgram, sendMessage, (path1, path2) => path1)
- pathGraph.vertices.collect().foreach(println(_))
- }
- // append the received path the this vertex attribute
- def vertexProgram(id: VertexId, attr: (String, List[String]), msg: List[String]) = {
- if (msg.nonEmpty) (attr._1, attr._2 ::: msg) else attr
- }
- // propagate the path to outer vertices
- def sendMessage(edge: EdgeTriplet[(String, List[String]), String]) = {
- val pageType = edge.srcAttr._1
- val path = edge.srcAttr._2
- if (pageType == "C" && path.size == 1) {
- println("Sending initial message from " + edge)
- Iterator((edge.dstId, path))
- } else if (path.size > 1) {
- println("Sending message of size " + path.size + " to " + edge.dstId)
- Iterator((edge.dstId, path))
- } else {
- println("No more messages from " + edge)
- Iterator.empty
- }
- }
- def combineMessages(msg1: List[String], msg2: List[String]) = { msg1 }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement