Advertisement
dan_sml

Untitled

Jul 29th, 2022
685
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.92 KB | None | 0 0
  1.     def add_division_district(self, data, d_division_district):
  2.         def process_row(grid, coordinates, district_row):
  3.             polygon_str = district_row['district_polygon']
  4.             esri_check_coordinates = \
  5.                 'ST_Intersects(nvl(ST_Polygon(polygon),ST_Multipolygon(polygon)), ST_Point(longitude, latitude))'
  6.             # Оставляем только точки только из обрабатываемого района
  7.             district_grid = coordinates \
  8.                 .withColumn('polygon', F.lit(polygon_str)) \
  9.                 .filter(F.expr(esri_check_coordinates)) \
  10.                 .drop('polygon')
  11.             # Присоединяем всю информацию о районах
  12.             district_grid = district_grid \
  13.                 .withColumn('district_okato_id', F.lit(district_row['district_okato_id'])) \
  14.                 .withColumn('district_nm', F.lit(district_row['district_nm'])) \
  15.                 .withColumn('division_okato_id', F.lit(district_row['division_okato_id'])) \
  16.                 .withColumn('division_nm', F.lit(district_row['division_nm']))
  17.             # Присоединяем в итоговую большую сетку
  18.             grid = grid.union(district_grid)
  19.             return grid
  20.  
  21.         data_locations = data\
  22.             .select(F.round(data.longitude, 4).alias("longitude"),
  23.                     F.round(data.latitude, 4).alias("latitude")
  24.             )\
  25.             .distinct()
  26.  
  27.         districts_rows = d_division_district.collect()
  28.  
  29.         data_loc_schema = StructType([
  30.             StructField("longitude", DoubleType(), True),
  31.             StructField("latitude", DoubleType(), True),
  32.             StructField("district_okato_id", StringType(), True),
  33.             StructField("district_nm", StringType(), True),
  34.             StructField("division_okato_id", StringType(), True),
  35.             StructField("division_nm", StringType(), True)
  36.         ])
  37.  
  38.         data_loc = self.spark.createDataFrame(data=[], schema=data_loc_schema)
  39.  
  40.         for row in districts_rows:
  41.             data_loc = process_row(data_loc, data_locations, row)
  42.  
  43.         data_loc = data_loc.select(
  44.             data_loc.longitude,
  45.             data_loc.latitude,
  46.             data_loc.district_okato_id,
  47.             data_loc.district_nm,
  48.             data_loc.division_okato_id,
  49.             data_loc.division_nm,
  50.         )
  51.  
  52.         data = data.join(
  53.             data_loc,
  54.             ((F.round(data.longitude, 4) == data_loc.longitude) &
  55.              (F.round(data.latitude, 4) == data_loc.latitude)),
  56.             how='left'
  57.         ).select(
  58.             data_loc.district_okato_id.alias('district_id'),
  59.             data_loc.district_nm.alias('district_nm'),
  60.             data_loc.division_okato_id.alias('division_id'),
  61.             data_loc.division_nm.alias('division_nm'),
  62.             *[data[column] for column in data.columns]
  63.         )
  64.  
  65.         return data
  66.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement