Advertisement
dan_sml

Untitled

Jul 21st, 2022
958
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.56 KB | None | 0 0
  1. def process_row(grid, coordinates, district_row):
  2.     # district_grid = coordinates.join(district, on=F.expr(esri_check_coordinates_districts), how='inner')
  3.     polygon_str = district_row['district_polygon']
  4.     esri_check_coordinates = \
  5.         'ST_Intersects(nvl(ST_Polygon(polygon),ST_Multipolygon(polygon)), ST_Point(gps_x, gps_y))'
  6.     district_grid = coordinates.withColumn('polygon', F.lit(polygon_str)) \
  7.         .filter(F.expr(esri_check_coordinates)) \
  8.         .drop('polygon')
  9.     # coordinates = coordinates.filter((coordinates.gps_x != district_grid.gps_x) |\
  10.     #                                  (coordinates.gps_y != district_grid.gps_y))
  11.     district_grid = district_grid.withColumn('district_okato_id', F.lit(district_row['district_okato_id']))\
  12.         .withColumn('district_nm', F.lit(district_row['district_nm']))\
  13.         .withColumn('division_okato_id', F.lit(district_row['division_okato_id']))\
  14.         .withColumn('division_nm', F.lit(district_row['division_nm']))
  15.     grid = grid.union(district_grid)
  16.     return grid
  17.  
  18.  
  19. districts_rows = districts.collect()
  20.  
  21. grid_schema = schema = StructType([
  22.     StructField("gps_x", DoubleType(), True),
  23.     StructField("gps_y", DoubleType(), True),
  24.     StructField("district_okato_id", StringType(), True),
  25.     StructField("district_nm", StringType(), True),
  26.     StructField("division_okato_id", StringType(), True),
  27.     StructField("division_nm", StringType(), True)
  28.   ])
  29.  
  30. grid = spark.createDataFrame(data=[], schema=grid_schema)
  31.  
  32. for row in districts_rows:
  33.     grid = process_row(grid, coords, row)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement