Advertisement
Guest User

Untitled

a guest
Sep 21st, 2019
252
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.76 KB | None | 0 0
  1. def reorg(datadir :String) {
  2.   val t0 = System.nanoTime()
  3.  
  4.   // load the three tables
  5.   val person   = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
  6.                       load(datadir + "/person.*csv.*")
  7.   val interest = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
  8.                       load(datadir + "/interest.*csv.*")
  9.   val knows    = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
  10.                       load(datadir + "/knows.*csv.*")
  11.  
  12.   // Remove unnecessary columns
  13.   val person_new = person.drop("firstName").drop("lastName").drop("gender").drop("creationDate").drop("locationIP").drop("browserUsed")
  14.  
  15.   // Remove friends that don't live in same location
  16.   // Idea: Create two new columns in knows df with the corresponding locatedIn of person and friend
  17.   // Then filter the rows that don't have the same location
  18.   // Finally remove the two location rows again
  19.   person.createOrReplaceTempView("person")
  20.   interest.createOrReplaceTempView("interest")
  21.   knows.createOrReplaceTempView("knows")
  22.   // Add person location to knows table
  23.   val knowsLocp = spark.sql("SELECT knows.*, person.locatedIn AS PersonLoc FROM knows LEFT JOIN person ON knows.personId = person.personId")
  24.   knowsLocp.createOrReplaceTempView("knowsLocp")
  25.   // Add friend location to knows table
  26.   val knowsLocfp = spark.sql("SELECT knowsLocp.*, person.locatedIn AS FriendLoc FROM knowsLocp LEFT JOIN person ON knowsLocp.friendId = person.personId")
  27.   // Only keep the entries where both live in same place
  28.   val knows_new = knowsLocfp.filter($"PersonLoc" === $"FriendLoc").select($"personId", $"friendId".alias("f"))
  29.  
  30.   // Only keep mutual knows
  31.   val mutual = knows_new.join(knows.select($"personId".alias("f"), $"friendId"), "f").filter($"personId"===$"friendId").select($"personId", $"f".alias("friendId"))
  32.  
  33.   // Register mutual df as sql table
  34.   mutual.createOrReplaceTempView("mutual")
  35.   person_new.createOrReplaceTempView("person_new")
  36.  
  37.   // Delete those without mutual friends from persons and interests
  38.   val person_filtered = spark.sql("DELETE FROM person_new, WHERE personId NOT IN (SELECT personId FROM mutual)")
  39.   val interest_filtered = spark.sql("DELETE FROM interest, WHERE personId NOT IN (SELECT personId FROM mutual)")
  40.  
  41.   // Write to parquet files
  42.   person_filtered.write.format("parquet").mode("overwrite").save(datadir + "/person.parquet")
  43.   interest_filtered.write.format("parquet").mode("overwrite").save(datadir + "/interest.parquet")
  44.   mutual.write.format("parquet").mode("overwrite").save(datadir + "/knows.parquet")
  45.  
  46.   val t1 = System.nanoTime()
  47.   println("reorg time: " + (t1 - t0)/1000000 + "ms")
  48. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement