Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def add_division_district(self, data, d_division_district):
- def process_row(grid, coordinates, district_row):
- polygon_str = district_row['district_polygon']
- esri_check_coordinates = \
- 'ST_Intersects(nvl(ST_Polygon(polygon),ST_Multipolygon(polygon)), ST_Point(longitude, latitude))'
- # Оставляем только точки только из обрабатываемого района
- district_grid = coordinates \
- .withColumn('polygon', F.lit(polygon_str)) \
- .filter(F.expr(esri_check_coordinates)) \
- .drop('polygon')
- # Присоединяем всю информацию о районах
- district_grid = district_grid \
- .withColumn('district_okato_id', F.lit(district_row['district_okato_id'])) \
- .withColumn('district_nm', F.lit(district_row['district_nm'])) \
- .withColumn('division_okato_id', F.lit(district_row['division_okato_id'])) \
- .withColumn('division_nm', F.lit(district_row['division_nm']))
- # Присоединяем в итоговую большую сетку
- grid = grid.union(district_grid)
- return grid
- data_locations = data\
- .select(F.round(data.longitude, 4).alias("longitude"),
- F.round(data.latitude, 4).alias("latitude")
- )\
- .distinct()
- districts_rows = d_division_district.collect()
- data_loc_schema = StructType([
- StructField("longitude", DoubleType(), True),
- StructField("latitude", DoubleType(), True),
- StructField("district_okato_id", StringType(), True),
- StructField("district_nm", StringType(), True),
- StructField("division_okato_id", StringType(), True),
- StructField("division_nm", StringType(), True)
- ])
- data_loc = self.spark.createDataFrame(data=[], schema=data_loc_schema)
- for row in districts_rows:
- data_loc = process_row(data_loc, data_locations, row)
- data_loc = data_loc.select(
- data_loc.longitude,
- data_loc.latitude,
- data_loc.district_okato_id,
- data_loc.district_nm,
- data_loc.division_okato_id,
- data_loc.division_nm,
- )
- data = data.join(
- data_loc,
- ((F.round(data.longitude, 4) == data_loc.longitude) &
- (F.round(data.latitude, 4) == data_loc.latitude)),
- how='left'
- ).select(
- data_loc.district_okato_id.alias('district_id'),
- data_loc.district_nm.alias('district_nm'),
- data_loc.division_okato_id.alias('division_id'),
- data_loc.division_nm.alias('division_nm'),
- *[data[column] for column in data.columns]
- )
- return data
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement