aiThanet

DegreesOfSeparation

Feb 2nd, 2021
713
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.sundogsoftware.spark
  2.  
  3. import org.apache.spark._
  4. import org.apache.spark.rdd._
  5. import org.apache.spark.util.LongAccumulator
  6. import org.apache.log4j._
  7. import scala.collection.mutable.ArrayBuffer
  8.  
  9. /** Finds the degrees of separation between two Marvel comic book characters, based
  10.  *  on co-appearances in a comic.
  11.  */
  12. object DegreesOfSeparation {
  13.  
  14.   // The characters we want to find the separation between.
  15.   val startCharacterID = 100 //SpiderMan
  16.   val targetCharacterID = 949 //ADAM 3,031 (who?)
  17.  
  18.   // We make our accumulator a "global" Option so we can reference it in a mapper later.
  19.   var hitCounter:Option[LongAccumulator] = None
  20.  
  21.   // Some custom data types
  22.   // BFSData contains an array of hero ID connections, the distance, and color.
  23.   type BFSData = (Array[Int], Int, String, Int)
  24.   // A BFSNode has a heroID and the BFSData associated with it.
  25.   type BFSNode = (Int, BFSData)
  26.  
  27.   /** Converts a line of raw input into a BFSNode */
  28.   def convertToBFS(line: String): BFSNode = {
  29.  
  30.     // Split up the line into fields
  31.     val fields = line.split("\\s+")
  32.  
  33.     // Extract this hero ID from the first field
  34.     val heroID = fields(0).toInt
  35.  
  36.  
  37.  
  38.     // Extract subsequent hero ID's into the connections array
  39.     var connections: ArrayBuffer[Int] = ArrayBuffer()
  40.     for ( connection <- 1 until (fields.length)) {
  41.       connections += fields(connection).toInt
  42.     }
  43.  
  44.     // Default distance and color is 9999 and white
  45.     var color:String = "WHITE"
  46.     var distance:Int = 9999
  47.     var parent: Int = -1
  48.  
  49.     // Unless this is the character we're starting from
  50.     if (heroID == startCharacterID) {
  51.       color = "GRAY"
  52.       distance = 0
  53.       parent = startCharacterID.toInt
  54.     }
  55.  
  56.     (heroID, (connections.toArray, distance, color, parent))
  57.   }
  58.  
  59.   /** Create "iteration 0" of our RDD of BFSNodes */
  60.   def createStartingRdd(sc:SparkContext): RDD[BFSNode] = {
  61.     val inputFile = sc.textFile("data/marvel-graph.txt")
  62.     inputFile.map(convertToBFS)
  63.   }
  64.  
  65.   /** Expands a BFSNode into this node and its children */
  66.   def bfsMap(node:BFSNode): Array[BFSNode] = {
  67.  
  68.     // Extract data from the BFSNode
  69.     val characterID:Int = node._1
  70.     val data:BFSData = node._2
  71.  
  72.     val connections:Array[Int] = data._1
  73.     val distance:Int = data._2
  74.     var color:String = data._3
  75.     var parent: Int = data._4
  76.  
  77.     // This is called from flatMap, so we return an array
  78.     // of potentially many BFSNodes to add to our new RDD
  79.     var results:ArrayBuffer[BFSNode] = ArrayBuffer()
  80.  
  81.     // Gray nodes are flagged for expansion, and create new
  82.     // gray nodes for each connection
  83.     if (color == "GRAY") {
  84.       for (connection <- connections) {
  85.         val newCharacterID = connection
  86.         val newDistance = distance + 1
  87.         val newColor = "GRAY"
  88.         val newParent = characterID
  89.  
  90.         // Have we stumbled across the character we're looking for?
  91.         // If so increment our accumulator so the driver script knows.
  92.         if (targetCharacterID == connection) {
  93.           if (hitCounter.isDefined) {
  94.             hitCounter.get.add(1)
  95.           }
  96.         }
  97.  
  98.         // Create our new Gray node for this connection and add it to the results
  99.         val newEntry:BFSNode = (newCharacterID, (Array(), newDistance, newColor, newParent))
  100.         results += newEntry
  101.       }
  102.  
  103.       // Color this node as black, indicating it has been processed already.
  104.       color = "BLACK"
  105.     }
  106.  
  107.     // Add the original node back in, so its connections can get merged with
  108.     // the gray nodes in the reducer.
  109.     val thisEntry:BFSNode = (characterID, (connections, distance, color, parent))
  110.     results += thisEntry
  111.  
  112.     results.toArray
  113.   }
  114.  
  115.   /** Combine nodes for the same heroID, preserving the shortest length and darkest color. */
  116.   def bfsReduce(data1:BFSData, data2:BFSData): BFSData = {
  117.  
  118.     // Extract data that we are combining
  119.     val edges1:Array[Int] = data1._1
  120.     val edges2:Array[Int] = data2._1
  121.     val distance1:Int = data1._2
  122.     val distance2:Int = data2._2
  123.     val color1:String = data1._3
  124.     val color2:String = data2._3
  125.     val parent1:Int = data1._4
  126.     val parent2:Int = data2._4
  127.  
  128.     // Default node values
  129.     var distance:Int = 9999
  130.     var color:String = "WHITE"
  131.     var edges:ArrayBuffer[Int] = ArrayBuffer()
  132.     var parent:Int = parent1
  133.  
  134.     // See if one is the original node with its connections.
  135.     // If so preserve them.
  136.     if (edges1.length > 0) {
  137.       edges ++= edges1
  138.     }
  139.     if (edges2.length > 0) {
  140.       edges ++= edges2
  141.     }
  142.  
  143.     // Preserve minimum distance
  144.     if (distance1 < distance) {
  145.       distance = distance1
  146.       parent = parent1
  147.     }
  148.     if (distance2 < distance) {
  149.       distance = distance2
  150.       parent = parent2
  151.     }
  152.  
  153.     // Preserve darkest color
  154.     if (color1 == "WHITE" && (color2 == "GRAY" || color2 == "BLACK")) {
  155.       color = color2
  156.     }
  157.     if (color1 == "GRAY" && color2 == "BLACK") {
  158.       color = color2
  159.     }
  160.     if (color2 == "WHITE" && (color1 == "GRAY" || color1 == "BLACK")) {
  161.       color = color1
  162.     }
  163.     if (color2 == "GRAY" && color1 == "BLACK") {
  164.       color = color1
  165.     }
  166.     if (color1 == "GRAY" && color2 == "GRAY") {
  167.       color = color1
  168.     }
  169.     if (color1 == "BLACK" && color2 == "BLACK") {
  170.       color = color1
  171.     }
  172.  
  173.     (edges.toArray, distance, color, parent)
  174.   }
  175.  
  176.   /** Our main function where the action happens */
  177.   def main(args: Array[String]) {
  178.  
  179.     // Set the log level to only print errors
  180.     Logger.getLogger("org").setLevel(Level.ERROR)
  181.  
  182.      // Create a SparkContext using every core of the local machine
  183.     val sc = new SparkContext("local[*]", "DegreesOfSeparation")
  184.  
  185.     // Our accumulator, used to signal when we find the target
  186.     // character in our BFS traversal.
  187.     hitCounter = Some(sc.longAccumulator("Hit Counter"))
  188.  
  189.     var iterationRdd = createStartingRdd(sc)
  190.     iterationRdd = iterationRdd.reduceByKey(bfsReduce)
  191.  
  192.  
  193.     for (iteration <- 1 to 10) {
  194.       println("Running BFS Iteration# " + iteration)
  195.  
  196.       // Create new vertices as needed to darken or reduce distances in the
  197.       // reduce stage. If we encounter the node we're looking for as a GRAY
  198.       // node, increment our accumulator to signal that we're done.
  199.       val mapped = iterationRdd.flatMap(bfsMap)
  200.  
  201.       // Note that mapped.count() action here forces the RDD to be evaluated, and
  202.       // that's the only reason our accumulator is actually updated.
  203.       println("Processing " + mapped.count() + " values.")
  204.       iterationRdd = mapped.reduceByKey(bfsReduce)
  205.       if (hitCounter.isDefined) {
  206.         val hitCount = hitCounter.get.value
  207.         if (hitCount > 0) {
  208.           println("Hit the target character! From " + hitCount +
  209.               " different direction(s).")
  210.  
  211.           val completeNode = iterationRdd.filter(x=>x._2._3 == "BLACK")
  212.  
  213.           var current_search = iterationRdd.filter(x=>x._1 == targetCharacterID && x._2._3 != "WHITE").first()._2._4
  214.  
  215.           println()
  216.           print(s"Path : $targetCharacterID -> $current_search")
  217.  
  218.           while(current_search != startCharacterID){
  219.             current_search = completeNode.filter(x=> x._1 == current_search).first()._2._4
  220.             print(s" -> $current_search")
  221.           }
  222.  
  223.           println()
  224.           return
  225.         }
  226.       }
  227.  
  228.       // Reducer combines data for each character ID, preserving the darkest
  229.       // color and shortest path.
  230.  
  231.     }
  232.   }
  233. }
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×