Advertisement
Guest User

Untitled

a guest
Apr 30th, 2017
69
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.22 KB | None | 0 0
  1. /*
  2. Contents of /tmp/data.txt
  3. 0 1 3 4 5
  4. 1 0 2 4
  5. 2 1 3 4
  6. 3 0 2
  7. 4 0 1 2 5
  8. 5 0 4
  9.  
  10.  
  11. Output:
  12. (0,1) 4
  13. (0,4) 1 5
  14. (0,5) 4
  15. (1,2) 4
  16. (1,4) 0 2
  17. (2,4) 1
  18. (4,5) 0
  19. */
  20.  
  21. /**
  22. Map function take a line like "5 0 4" and then generates pair of friends
  23. sorted by id (called the 'key') and output along with the entire friends list.
  24.  
  25. Example: pairMapper("4 0 1 2 5") would return the following list
  26. (0, 4) => 0 1 2 5
  27. (1, 4) => 0 1 2 5
  28. (2, 4) => 0 1 2 5
  29. (4, 5) => 0 1 2 5
  30. */
  31. def pairMapper(line: String) = {
  32. val words = line.split(" ")
  33. val key = words(0)
  34. val pairs = words.slice(1, words.size).map(friend => {
  35. if (key < friend) (key, friend) else (friend, key)
  36. })
  37. pairs.map(pair => (pair, words.slice(1, words.size).toSet))
  38. }
  39.  
  40. /**
  41. Reduce function groups by the key and intersects the set with the accumulator to find
  42. common friends.
  43. */
  44. def pairReducer(accumulator: Set[String], set: Set[String]) = {
  45. accumulator intersect set
  46. }
  47.  
  48. val data = sc.textFile("file:///tmp/data.txt") // Repoint to a directory / file in HDFS to run on cluster
  49. val results = data.flatMap(pairMapper)
  50. .reduceByKey(pairReducer)
  51. .filter(!_._2.isEmpty)
  52. .sortByKey()
  53.  
  54. results.collect.foreach(line => {
  55. println(s"${line._1} ${line._2.mkString(" ")}")
  56. })cala prog
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement