Advertisement
aiThanet

DegreesOfSeparation

Feb 2nd, 2021
2,142
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 7.37 KB | None | 0 0
  1. package com.sundogsoftware.spark
  2.  
  3. import org.apache.spark._
  4. import org.apache.spark.rdd._
  5. import org.apache.spark.util.LongAccumulator
  6. import org.apache.log4j._
  7. import scala.collection.mutable.ArrayBuffer
  8.  
  9. /** Finds the degrees of separation between two Marvel comic book characters, based
  10.  *  on co-appearances in a comic.
  11.  */
  12. object DegreesOfSeparation {
  13.  
  14.   // The characters we want to find the separation between.
  15.   val startCharacterID = 100 //SpiderMan
  16.   val targetCharacterID = 949 //ADAM 3,031 (who?)
  17.  
  18.   // We make our accumulator a "global" Option so we can reference it in a mapper later.
  19.   var hitCounter:Option[LongAccumulator] = None
  20.  
  21.   // Some custom data types
  22.   // BFSData contains an array of hero ID connections, the distance, and color.
  23.   type BFSData = (Array[Int], Int, String, Int)
  24.   // A BFSNode has a heroID and the BFSData associated with it.
  25.   type BFSNode = (Int, BFSData)
  26.  
  27.   /** Converts a line of raw input into a BFSNode */
  28.   def convertToBFS(line: String): BFSNode = {
  29.  
  30.     // Split up the line into fields
  31.     val fields = line.split("\\s+")
  32.  
  33.     // Extract this hero ID from the first field
  34.     val heroID = fields(0).toInt
  35.  
  36.  
  37.  
  38.     // Extract subsequent hero ID's into the connections array
  39.     var connections: ArrayBuffer[Int] = ArrayBuffer()
  40.     for ( connection <- 1 until (fields.length)) {
  41.       connections += fields(connection).toInt
  42.     }
  43.  
  44.     // Default distance and color is 9999 and white
  45.     var color:String = "WHITE"
  46.     var distance:Int = 9999
  47.     var parent: Int = -1
  48.  
  49.     // Unless this is the character we're starting from
  50.     if (heroID == startCharacterID) {
  51.       color = "GRAY"
  52.       distance = 0
  53.       parent = startCharacterID.toInt
  54.     }
  55.  
  56.     (heroID, (connections.toArray, distance, color, parent))
  57.   }
  58.  
  59.   /** Create "iteration 0" of our RDD of BFSNodes */
  60.   def createStartingRdd(sc:SparkContext): RDD[BFSNode] = {
  61.     val inputFile = sc.textFile("data/marvel-graph.txt")
  62.     inputFile.map(convertToBFS)
  63.   }
  64.  
  65.   /** Expands a BFSNode into this node and its children */
  66.   def bfsMap(node:BFSNode): Array[BFSNode] = {
  67.  
  68.     // Extract data from the BFSNode
  69.     val characterID:Int = node._1
  70.     val data:BFSData = node._2
  71.  
  72.     val connections:Array[Int] = data._1
  73.     val distance:Int = data._2
  74.     var color:String = data._3
  75.     var parent: Int = data._4
  76.  
  77.     // This is called from flatMap, so we return an array
  78.     // of potentially many BFSNodes to add to our new RDD
  79.     var results:ArrayBuffer[BFSNode] = ArrayBuffer()
  80.  
  81.     // Gray nodes are flagged for expansion, and create new
  82.     // gray nodes for each connection
  83.     if (color == "GRAY") {
  84.       for (connection <- connections) {
  85.         val newCharacterID = connection
  86.         val newDistance = distance + 1
  87.         val newColor = "GRAY"
  88.         val newParent = characterID
  89.  
  90.         // Have we stumbled across the character we're looking for?
  91.         // If so increment our accumulator so the driver script knows.
  92.         if (targetCharacterID == connection) {
  93.           if (hitCounter.isDefined) {
  94.             hitCounter.get.add(1)
  95.           }
  96.         }
  97.  
  98.         // Create our new Gray node for this connection and add it to the results
  99.         val newEntry:BFSNode = (newCharacterID, (Array(), newDistance, newColor, newParent))
  100.         results += newEntry
  101.       }
  102.  
  103.       // Color this node as black, indicating it has been processed already.
  104.       color = "BLACK"
  105.     }
  106.  
  107.     // Add the original node back in, so its connections can get merged with
  108.     // the gray nodes in the reducer.
  109.     val thisEntry:BFSNode = (characterID, (connections, distance, color, parent))
  110.     results += thisEntry
  111.  
  112.     results.toArray
  113.   }
  114.  
  115.   /** Combine nodes for the same heroID, preserving the shortest length and darkest color. */
  116.   def bfsReduce(data1:BFSData, data2:BFSData): BFSData = {
  117.  
  118.     // Extract data that we are combining
  119.     val edges1:Array[Int] = data1._1
  120.     val edges2:Array[Int] = data2._1
  121.     val distance1:Int = data1._2
  122.     val distance2:Int = data2._2
  123.     val color1:String = data1._3
  124.     val color2:String = data2._3
  125.     val parent1:Int = data1._4
  126.     val parent2:Int = data2._4
  127.  
  128.     // Default node values
  129.     var distance:Int = 9999
  130.     var color:String = "WHITE"
  131.     var edges:ArrayBuffer[Int] = ArrayBuffer()
  132.     var parent:Int = parent1
  133.  
  134.     // See if one is the original node with its connections.
  135.     // If so preserve them.
  136.     if (edges1.length > 0) {
  137.       edges ++= edges1
  138.     }
  139.     if (edges2.length > 0) {
  140.       edges ++= edges2
  141.     }
  142.  
  143.     // Preserve minimum distance
  144.     if (distance1 < distance) {
  145.       distance = distance1
  146.       parent = parent1
  147.     }
  148.     if (distance2 < distance) {
  149.       distance = distance2
  150.       parent = parent2
  151.     }
  152.  
  153.     // Preserve darkest color
  154.     if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")) {
  155.       color = color2
  156.     }
  157.     if (color1 == "GRAY" && color2 == "BLACK") {
  158.       color = color2
  159.     }
  160.     if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
  161.       color = color1
  162.     }
  163.     if (color2 == "GRAY" && color1 == "BLACK") {
  164.       color = color1
  165.     }
  166.     if (color1 == "GRAY" && color2 == "GRAY") {
  167.       color = color1
  168.     }
  169.     if (color1 == "BLACK" && color2 == "BLACK") {
  170.       color = color1
  171.     }
  172.  
  173.     (edges.toArray, distance, color, parent)
  174.   }
  175.  
  176.   /** Our main function where the action happens */
  177.   def main(args: Array[String]) {
  178.  
  179.     // Set the log level to only print errors
  180.     Logger.getLogger("org").setLevel(Level.ERROR)
  181.  
  182.      // Create a SparkContext using every core of the local machine
  183.     val sc = new SparkContext("local[*]", "DegreesOfSeparation")
  184.  
  185.     // Our accumulator, used to signal when we find the target
  186.     // character in our BFS traversal.
  187.     hitCounter = Some(sc.longAccumulator("Hit Counter"))
  188.  
  189.     var iterationRdd = createStartingRdd(sc)
  190.     iterationRdd = iterationRdd.reduceByKey(bfsReduce)
  191.  
  192.  
  193.     for (iteration <- 1 to 10) {
  194.       println("Running BFS Iteration# " + iteration)
  195.  
  196.       // Create new vertices as needed to darken or reduce distances in the
  197.       // reduce stage. If we encounter the node we're looking for as a GRAY
  198.       // node, increment our accumulator to signal that we're done.
  199.       val mapped = iterationRdd.flatMap(bfsMap)
  200.  
  201.       // Note that mapped.count() action here forces the RDD to be evaluated, and
  202.       // that's the only reason our accumulator is actually updated.
  203.       println("Processing " + mapped.count() + " values.")
  204.       iterationRdd = mapped.reduceByKey(bfsReduce)
  205.       if (hitCounter.isDefined) {
  206.         val hitCount = hitCounter.get.value
  207.         if (hitCount > 0) {
  208.           println("Hit the target character! From " + hitCount +
  209.               " different direction(s).")
  210.  
  211.           val completeNode = iterationRdd.filter(x=>x._2._3 == "BLACK")
  212.  
  213.           var current_search = iterationRdd.filter(x=>x._1 == targetCharacterID && x._2._3 != "WHITE").first()._2._4
  214.  
  215.           println()
  216.           print(s"Path : $targetCharacterID -> $current_search")
  217.  
  218.           while(current_search != startCharacterID){
  219.             current_search = completeNode.filter(x=> x._1 == current_search).first()._2._4
  220.             print(s" -> $current_search")
  221.           }
  222.  
  223.           println()
  224.           return
  225.         }
  226.       }
  227.  
  228.       // Reducer combines data for each character ID, preserving the darkest
  229.       // color and shortest path.
  230.  
  231.     }
  232.   }
  233. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement