Advertisement
Guest User

Untitled

a guest
Dec 18th, 2014
186
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.52 KB | None | 0 0
  1. def theta_join(S,T, join_condition=lambda s,t: s==t):
  2. ct, cs, height, width = _create_partitioning_rectangles(S,T)
  3. S = S.map(lambda (k,v): (k,(1,v)))
  4. T = T.map(lambda (k,v): (k,(2,v)))
  5. dispatch = reducer(join_condition)
  6. mapper = region_map(ct, cs, height, width)
  7. D = S.union(T).flatMap(mapper).groupByKey().flatMap(dispatch)
  8. return D
  9.  
  10. def _create_partitioning_rectangles(S,T):
  11. global cardS
  12. global cardT
  13. cardS = cardinality(S)
  14. cardT = cardinality(T)
  15. if cardT <= cardS:
  16. T,S = S,T
  17. cardT, cardS = cardS, cardT
  18.  
  19. r = 1.0*sc.defaultParallelism
  20. opt_ratio = sqrt(cardT*cardS/r)
  21.  
  22. if (cardS % opt_ratio) == 0 and (cardT % opt_ratio) == 0:
  23. # optimal case
  24. height = width = opt_ratio
  25. ct, cs = int(cardT/opt_ratio), int(cardS/opt_ratio)
  26. elif cardS < cardT/r:
  27. height, width = cardS, cardT/r
  28. ct, cs = int(r), 1
  29. else:
  30. assert cardT/r <= cardS <= cardT
  31. cs = int(cardS/opt_ratio)
  32. ct = int(cardT/opt_ratio)
  33. height = width = (1 + 1./min(cs,ct))*opt_ratio
  34.  
  35. return ct, cs, height, width
  36.  
  37. def row_lookup(row, ct, cs, height, width):
  38. start = int(row/height) + 1
  39. end = start + ct
  40. return range(start, end)
  41.  
  42. def col_lookup(col, ct, cs, height, width):
  43. start = int(col/width) + 1
  44. end = ct*cs + 1
  45. return range(start, end, ct)
  46.  
  47. def cardinality(X):
  48. return X.count()
  49.  
  50.  
  51. def region_map(ct, cs, height, width):
  52. def mapper(x):
  53. (key, (source, value)) = x
  54.  
  55. if source == 1:
  56. row = randint(1, cardS)
  57. return [(region_id, x) for region_id in row_lookup(row, ct, cs, height, width)]
  58. else:
  59. col = randint(1,cardT)
  60. return [(region_id, x) for region_id in col_lookup(col, ct, cs, height, width)]
  61. return mapper
  62.  
  63. def reducer(join_condition):
  64. def dispatch(x):
  65. #(region_id, [x1,x2,..])
  66. region_id, data = x
  67.  
  68. stuples, ttuples = [], []
  69.  
  70. for (key, (source,value)) in data:
  71. if source==1:
  72. stuples.append((key,value))
  73. else:
  74. ttuples.append((key,value))
  75. return _join(ttuples, stuples, join_condition)
  76. return dispatch
  77.  
  78. def _join(ttuples, stuples, join_condition):
  79. results = []
  80.  
  81. if len(ttuples) == 0 or len(stuples) == 0:
  82. return []
  83. for t in ttuples:
  84. for s in stuples:
  85. if join_condition(t[0],s[0]):
  86. keys = (t[0],s[0])
  87. values = (t[1],s[1])
  88. results.append((keys,values))
  89. return results
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement