Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def process_row(grid, coordinates, district_row):
- # district_grid = coordinates.join(district, on=F.expr(esri_check_coordinates_districts), how='inner')
- polygon_str = district_row['district_polygon']
- esri_check_coordinates = \
- 'ST_Intersects(nvl(ST_Polygon(polygon),ST_Multipolygon(polygon)), ST_Point(gps_x, gps_y))'
- district_grid = coordinates.withColumn('polygon', F.lit(polygon_str)) \
- .filter(F.expr(esri_check_coordinates)) \
- .drop('polygon')
- # coordinates = coordinates.filter((coordinates.gps_x != district_grid.gps_x) |\
- # (coordinates.gps_y != district_grid.gps_y))
- district_grid = district_grid.withColumn('district_okato_id', F.lit(district_row['district_okato_id']))\
- .withColumn('district_nm', F.lit(district_row['district_nm']))\
- .withColumn('division_okato_id', F.lit(district_row['division_okato_id']))\
- .withColumn('division_nm', F.lit(district_row['division_nm']))
- grid = grid.union(district_grid)
- return grid
- districts_rows = districts.collect()
- grid_schema = schema = StructType([
- StructField("gps_x", DoubleType(), True),
- StructField("gps_y", DoubleType(), True),
- StructField("district_okato_id", StringType(), True),
- StructField("district_nm", StringType(), True),
- StructField("division_okato_id", StringType(), True),
- StructField("division_nm", StringType(), True)
- ])
- grid = spark.createDataFrame(data=[], schema=grid_schema)
- for row in districts_rows:
- grid = process_row(grid, coords, row)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement