Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- '''
- intersection of two range sets using apache spark
- '''
- from pyspark import SparkContext
- def intersects(a, b):
- return max(a[0], b[0]) <= min(a[1], b[1])
- def intersection(a, b):
- return (max(a[0], b[0]), min(a[1], b[1]))
- if __name__ == "__main__":
- sc = SparkContext(appName="RangeSetIntersect")
- # a and b are two sets of 1-dimensional ranges
- a = sc.parallelize([(1,10),(11,14),(15,23),(24,40)])
- b = sc.parallelize([(1,8),(9,10),(11,16),(17,29),(31,35),(36,40)])
- # how do we choose the grid size?
- gridSize = 10
- # map each range to all the grid cells that it touches
- aPart = a.map(lambda x : (range(x[0]/gridSize+1, x[1]/gridSize+2), x)) \
- .flatMap(lambda x: [(y,x[1]) for y in x[0]])
- bPart = b.map(lambda x : (range(x[0]/gridSize+1, x[1]/gridSize+2), x)) \
- .flatMap(lambda x: [(y,x[1]) for y in x[0]])
- # join the ranges by grid cell number
- # keep intersecting pairs
- # compute he intersection ranges
- # sort by start coordinate
- int_a_b = aPart.join(bPart) \
- .filter(lambda x: intersects(x[1][0], x[1][1])) \
- .map(lambda x: intersection(x[1][0], x[1][1])) \
- .distinct() \
- .sortByKey()
- # here we collect or save the result
- int_a_b.collect()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement