Advertisement
amnonkhen

spatial set operations in Apache Spark

Aug 3rd, 2014
284
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.19 KB | None | 0 0
  1. '''
  2. intersection of two range sets using apache spark
  3. '''
  4. from pyspark import SparkContext
  5.  
  6. def intersects(a, b):
  7.     return max(a[0], b[0]) <= min(a[1], b[1])
  8.  
  9. def intersection(a, b):
  10.     return (max(a[0], b[0]), min(a[1], b[1]))
  11.  
  12. if __name__ == "__main__":
  13.     sc = SparkContext(appName="RangeSetIntersect")
  14.     # a and b are two sets of 1-dimensional ranges
  15.     a = sc.parallelize([(1,10),(11,14),(15,23),(24,40)])
  16.     b = sc.parallelize([(1,8),(9,10),(11,16),(17,29),(31,35),(36,40)])
  17.    
  18.     # how do we choose the grid size?
  19.     gridSize = 10
  20.    
  21.     # map each range to all the grid cells that it touches
  22.     aPart = a.map(lambda x : (range(x[0]/gridSize+1, x[1]/gridSize+2), x)) \
  23.         .flatMap(lambda x: [(y,x[1]) for y in x[0]])
  24.     bPart = b.map(lambda x : (range(x[0]/gridSize+1, x[1]/gridSize+2), x)) \
  25.         .flatMap(lambda x: [(y,x[1]) for y in x[0]])
  26.    
  27.     # join the ranges by grid cell number
  28.     # keep intersecting pairs
  29.     # compute he intersection ranges
  30.     # sort by start coordinate
  31.     int_a_b = aPart.join(bPart) \
  32.         .filter(lambda x: intersects(x[1][0], x[1][1])) \
  33.         .map(lambda x: intersection(x[1][0], x[1][1])) \
  34.         .distinct() \
  35.         .sortByKey()
  36.        
  37.     # here we collect or save the result
  38.     int_a_b.collect()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement