Advertisement
Guest User

Untitled

a guest
Sep 19th, 2018
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.14 KB | None | 0 0
  1. def reorg(datadir :String)
  2. {
  3.   val t0 = System.nanoTime()
  4.  
  5.   // nothing here (yet)
  6.  
  7.   val t1 = System.nanoTime()
  8.   println("reorg time: " + (t1 - t0)/1000000 + "ms")
  9. }
  10.  
  11. def cruncher(datadir :String, a1 :Int, a2 :Int, a3 :Int, a4 :Int, lo :Int, hi :Int) :org.apache.spark.sql.DataFrame =
  12. {
  13.    val t0 = System.nanoTime()
  14.  
  15.   // load the three tables
  16.   val person   = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
  17.                        load(datadir + "/person.*csv.*")
  18.   val interest = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
  19.                        load(datadir + "/interest.*csv.*")
  20.   val knows    = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
  21.                        load(datadir + "/knows.*csv.*")
  22.  
  23.   // select the relevant (personId, interest) tuples, and add a boolean column "nofan" (true iff this is not a a1 tuple)
  24.   val focus    = interest.filter($"interest" isin (a1, a2, a3, a4)).
  25.                           withColumn("nofan", $"interest".notEqual(a1))
  26.  
  27.   // compute person score (#relevant interests): join with focus, groupby & aggregate. Note: nofan=true iff person does not like a1
  28.   val scores   = person.join(focus, "personId").
  29.                         groupBy("personId", "locatedIn", "birthday").
  30.                         agg(count("personId") as "score", min("nofan") as "nofan")
  31.  
  32.   // filter (personId, score, locatedIn) tuples with score>1, being nofan, and having the right birthdate
  33.   val cands    = scores.filter($"score" > 0 && $"nofan").
  34.                         withColumn("bday", month($"birthday")*100 + dayofmonth($"birthday")).
  35.                         filter($"bday" >= lo && $"bday" <= hi)
  36.  
  37.   // create (personId, ploc, friendId, score) pairs by joining with knows (and renaming locatedIn into ploc)
  38.   val pairs    = cands.select($"personId", $"locatedIn".alias("ploc"), $"score").
  39.                        join(knows, "personId")
  40.  
  41.   // re-use the scores dataframe to create a (friendId, floc) dataframe of persons who are a fan (not nofan)
  42.   val fanlocs  = scores.filter(!$"nofan").select($"personId".alias("friendId"), $"locatedIn".alias("floc"))
  43.  
  44.   // join the pairs to get a (personId, ploc, friendId, floc, score), and then filter on same location, and remove ploc and floc columns
  45.   val results  = pairs.join(fanlocs, "friendId").
  46.                        filter($"ploc"===$"floc").
  47.                        select($"personId".alias("p"), $"friendId".alias("f"), $"score")
  48.  
  49.   // do the bidirectionality check by joining towards knows, and keeping only the (p, f, score) pairs where also f knows p
  50.   val bidir    = results.join(knows.select($"personId".alias("f"), $"friendId"), "f").filter($"p"===$"friendId")
  51.  
  52.   // keep only the (p, f, score) columns and sort the result
  53.   val ret      = bidir.select($"p", $"f", $"score").orderBy(desc("score"), asc("p"), asc("f"))
  54.  
  55.   ret.show(1000) // force execution now, and display results to stdout
  56.  
  57.   val t1 = System.nanoTime()
  58.   println("cruncher time: " + (t1 - t0)/1000000 + "ms")
  59.  
  60.   return ret
  61. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement