Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def reorg(datadir :String) {
- val t0 = System.nanoTime()
- // load the three tables
- val person = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
- load(datadir + "/person.*csv.*")
- val interest = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
- load(datadir + "/interest.*csv.*")
- val knows = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferschema", "true").
- load(datadir + "/knows.*csv.*")
- // Remove unnecessary columns
- val person_new = person.drop("firstName").drop("lastName").drop("gender").drop("creationDate").drop("locationIP").drop("browserUsed")
- // Remove friends that don't live in same location
- // Idea: Create two new columns in knows df with the corresponding locatedIn of person and friend
- // Then filter the rows that don't have the same location
- // Finally remove the two location rows again
- person.createOrReplaceTempView("person")
- interest.createOrReplaceTempView("interest")
- knows.createOrReplaceTempView("knows")
- // Add person location to knows table
- val knowsLocp = spark.sql("SELECT knows.*, person.locatedIn AS PersonLoc FROM knows LEFT JOIN person ON knows.personId = person.personId")
- knowsLocp.createOrReplaceTempView("knowsLocp")
- // Add friend location to knows table
- val knowsLocfp = spark.sql("SELECT knowsLocp.*, person.locatedIn AS FriendLoc FROM knowsLocp LEFT JOIN person ON knowsLocp.friendId = person.personId")
- // Only keep the entries where both live in same place
- val knows_new = knowsLocfp.filter($"PersonLoc" === $"FriendLoc").select($"personId", $"friendId".alias("f"))
- // Only keep mutual knows
- val mutual = knows_new.join(knows.select($"personId".alias("f"), $"friendId"), "f").filter($"personId"===$"friendId").select($"personId", $"f".alias("friendId"))
- // Delete those without mutual friends from persons and interests
- val person_filtered = spark.sql("DELETE FROM person_new, WHERE personId NOT IN (SELECT personId FROM mutual)")
- val interest_filtered = spark.sql("DELETE FROM interest, WHERE personId NOT IN (SELECT personId FROM mutual)")
- // Write to parquet files
- person_new.write.format("parquet").mode("overwrite").save(datadir + "/person.parquet")
- interest.write.format("parquet").mode("overwrite").save(datadir + "/interest.parquet")
- mutual.write.format("parquet").mode("overwrite").save(datadir + "/knows.parquet")
- val t1 = System.nanoTime()
- println("reorg time: " + (t1 - t0)/1000000 + "ms")
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement