Guest User

Untitled

a guest
Jan 21st, 2019
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.18 KB | None | 0 0
  1. def create_edges(line):
  2. a = [int(x) for x in line.split(" ")]
  3. edges_list=[]
  4.  
  5. for i in range(0, len(a)-1):
  6. for j in range(i+1 ,len(a)):
  7. edges_list.append((a[i],a[j]))
  8. edges_list.append((a[j],a[i]))
  9. return edges_list
  10.  
  11. # adj_list.txt is a txt file containing adjacency list of the graph.
  12. adjacency_list = sc.textFile("adj_list.txt")
  13.  
  14. edges_rdd = adjacency_list.flatMap(lambda line : create_edges(line)).distinct()
  15.  
  16. def largeStarInit(record):
  17. a, b = record
  18. yield (a,b)
  19. yield (b,a)
  20.  
  21. def largeStar(record):
  22. a, b = record
  23. t_list = list(b)
  24. t_list.append(a)
  25. list_min = min(t_list)
  26. for x in b:
  27. if a < x:
  28. yield (x,list_min)
  29.  
  30. def smallStarInit(record):
  31. a, b = record
  32. if b<=a:
  33. yield (a,b)
  34. else:
  35. yield (b,a)
  36.  
  37. def smallStar(record):
  38. a, b = record
  39. t_list = list(b)
  40. t_list.append(a)
  41. list_min = min(t_list)
  42. for x in t_list:
  43. if x!=list_min:
  44. yield (x,list_min)
  45.  
  46. #Handle case for single nodes
  47. def single_vertex(line):
  48. a = [int(x) for x in line.split(" ")]
  49. edges_list=[]
  50. if len(a)==1:
  51. edges_list.append((a[0],a[0]))
  52. return edges_list
  53.  
  54. iteration_num =0
  55. while 1==1:
  56. if iteration_num==0:
  57. print "iter", iteration_num
  58. large_star_rdd = edges_rdd.groupByKey().flatMap(lambda x : largeStar(x))
  59. small_star_rdd = large_star_rdd.flatMap(lambda x : smallStarInit(x)).groupByKey().flatMap(lambda x : smallStar(x)).distinct()
  60. iteration_num += 1
  61.  
  62. else:
  63. print "iter", iteration_num
  64. large_star_rdd = small_star_rdd.flatMap(lambda x: largeStarInit(x)).groupByKey().flatMap(lambda x : largeStar(x)).distinct()
  65. small_star_rdd = large_star_rdd.flatMap(lambda x : smallStarInit(x)).groupByKey().flatMap(lambda x : smallStar(x)).distinct()
  66. iteration_num += 1
  67. #check Convergence
  68.  
  69. changes = (large_star_rdd.subtract(small_star_rdd).union(small_star_rdd.subtract(large_star_rdd))).collect()
  70. if len(changes) == 0 :
  71. break
  72.  
  73. single_vertex_rdd = adjacency_list.flatMap(lambda line : single_vertex(line)).distinct()
  74.  
  75. answer = single_vertex_rdd.collect() + large_star_rdd.collect()
  76.  
  77. print answer[:10]
Add Comment
Please, Sign In to add comment