Advertisement
Guest User

graph.reverse

a guest
Apr 16th, 2014
45
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.06 KB | None | 0 0
  1.  
  2. import org.apache.spark.{SparkContext, SparkConf}
  3. import org.apache.spark.graphx._
  4. import org.apache.spark.rdd.RDD
  5.  
  6. object TopPaths {
  7.  
  8.   def main(args: Array[String]) {
  9.     val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("Hits"))
  10.  
  11.     val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
  12.       (1l, "G"), (2l, "S"), (3l, "C"),
  13.       (4l, "G"),
  14.       (5l, "D"), (6l, "D"), (7l, "G"), (8l, "C"),
  15.       (9l, "S"), (10l, "C"), (11l, "D"),
  16.       (12l, "G"), (13l, "S"),
  17.       (14l, "C")))
  18.  
  19.     val edges: RDD[Edge[String]] = sc.parallelize(Array(
  20.       Edge(1l, 2l, ""), Edge(2l, 3l, ""),
  21.       Edge(5l, 6l, ""), Edge(6l, 7l, ""), Edge(7l, 8l, ""),
  22.       Edge(9l, 10l, ""), Edge(10l, 11l, ""),
  23.       Edge(12l, 13l, "")
  24.     ))
  25.  
  26.     val graph = Graph(vertices, edges).reverse.
  27.       mapVertices((id, pageType) => (pageType, List(pageType)))
  28. //    val graph = Graph(vertices, edges).
  29. //      mapVertices((id, pageType) => (pageType, List(pageType)))
  30.  
  31.     val pathGraph = Pregel(graph, List.empty[String],
  32.       maxIterations = 4,  activeDirection = EdgeDirection.Out)(
  33.         vertexProgram, sendMessage, (path1, path2) => path1)
  34.  
  35.     pathGraph.vertices.collect().foreach(println(_))
  36.   }
  37.  
  38.   // append the received path the this vertex attribute
  39.   def vertexProgram(id: VertexId, attr: (String, List[String]), msg: List[String]) = {
  40.     if (msg.nonEmpty) (attr._1, attr._2 ::: msg) else attr
  41.   }
  42.  
  43.   // propagate the path to outer vertices
  44.   def sendMessage(edge: EdgeTriplet[(String, List[String]), String]) = {
  45.     val pageType = edge.srcAttr._1
  46.     val path = edge.srcAttr._2
  47.  
  48.     if (pageType == "C" && path.size == 1) {
  49.       println("Sending initial message from " + edge)
  50.       Iterator((edge.dstId, path))
  51.     } else if (path.size > 1) {
  52.       println("Sending message of size " + path.size + " to " + edge.dstId)
  53.       Iterator((edge.dstId, path))
  54.     } else {
  55.       println("No more messages from " + edge)
  56.       Iterator.empty
  57.     }
  58.   }
  59.  
  60.   def combineMessages(msg1: List[String], msg2: List[String]) = { msg1 }
  61. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement