Advertisement
laris_fdz

Untitled

Feb 5th, 2023
671
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.41 KB | None | 0 0
  1. from datetime import datetime
  2. from time import sleep
  3.  
  4. from pyspark.sql import SparkSession, DataFrame
  5. from pyspark.sql import functions as F
  6. from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType, LongType
  7.  
  8. TOPIC_NAME_91 = 'student.topic.cohort5.larisfdz.out'
  9.  
  10. spark_master = 'local'
  11. spark_app_name = "coh5_larisfdz_s8-3-9-1"
  12. spark_jars_packages = ",".join([
  13.     "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
  14.     "org.postgresql:postgresql:42.4.0",
  15. ])
  16.  
  17. postgresql_settings = {
  18.     'user': 'master',
  19.     'password': 'de-master-password',
  20.     'url': 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de',
  21.     'driver': 'org.postgresql.Driver',
  22.     'dbtable': 'marketing_companies',
  23.     # 'user': 'student',
  24.     # 'password': 'de-student',
  25. }
  26.  
  27. kafka_security_options = {
  28.     'kafka.security.protocol': 'SASL_SSL',
  29.     'kafka.sasl.mechanism': 'SCRAM-SHA-512',
  30.     'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";',
  31. }
  32. kafka_bootstrap_servers = 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091'
  33.  
  34.  
  35. def spark_init(app_name) -> SparkSession:
  36.     spark_session = SparkSession.builder \
  37.         .master(spark_master) \
  38.         .appName(app_name) \
  39.         .config("spark.jars.packages", spark_jars_packages) \
  40.         .getOrCreate()
  41.  
  42.     return spark_session
  43.  
  44.  
  45. def read_marketing(spark_session: SparkSession) -> DataFrame:
  46.     # - Схема DataFrame ресторанных акций:
  47.     #
  48.     # >> adv_campaign_df.printSchema()
  49.     # root
  50.     #  |-- id: string (nullable = true) - идентификатор акции
  51.     #  |-- name: string (nullable = true) - наименование акции
  52.     #  |-- point_lat: double (nullable = true) - широта точки/ресторана
  53.     #  |-- point_lon: double (nullable = true) - долгота точки/ресторана
  54.     #  |-- description: string (nullable = true) - описание акции
  55.     #  |-- start_time: timestamp (nullable = true) - время начала акции
  56.     #  |-- end_time: timestamp (nullable = true) - время конца акции
  57.     #  |-- radius: short (nullable = true) - возможное расстояние до клиента
  58.     df = spark_session.read \
  59.         .format('jdbc') \
  60.         .options(**postgresql_settings) \
  61.         .load()
  62.     return df
  63.  
  64.  
  65. def read_client_stream(spark_session: SparkSession) -> DataFrame:
  66.     # Не забудьте дедуплицировать данные, когда будете читать клиентский стрим.
  67.     #
  68.     # - Схема DataFrame клиентского стрима:
  69.     #
  70.     # >> client_stream_df.printSchema()
  71.  
  72.     df_src = spark_session.readStream.format('kafka') \
  73.         .options(**kafka_security_options) \
  74.         .option('kafka.bootstrap.servers', kafka_bootstrap_servers) \
  75.         .option('subscribe', 'student.topic.cohort5.larisfdz') \
  76.         .load()
  77.  
  78.     df_jsoned = df_src \
  79.         .withColumn('key_str', f.col('key').cast(StringType())) \
  80.         .withColumn('value_json', f.col('value').cast(StringType())) \
  81.         .drop('key', 'value')
  82.  
  83.     # schema_of_json more soft than tz
  84.     # schema4value = f.schema_of_json(df_jsoned.select('value_json').head()[0])
  85.  
  86.     schema4value = StructType([
  87.         StructField("client_id", StringType(), nullable=True),
  88.         StructField("timestamp", TimestampType(), nullable=True),
  89.         StructField("lat", DoubleType(), nullable=True),
  90.         StructField("lon", DoubleType(), nullable=True),
  91.     ])
  92.  
  93.     df_stringed = df_jsoned \
  94.         .withColumn('key', F.col('key_str')) \
  95.         .withColumn('value', F.from_json(f.col('value_json'), schema4value)) \
  96.         .drop('key_str', 'value_json')
  97.  
  98.     df_schemized = df_stringed \
  99.         .select(
  100.             F.col('value.client_id').cast(StringType()).alias('client_id'),
  101.             F.col('value.timestamp').cast(TimestampType()).alias('timestamp'),
  102.             F.col('value.lat').cast(DoubleType()).alias('lat'),
  103.             F.col('value.lon').cast(DoubleType()).alias('lon'),
  104.             F.col('offset').cast(LongType()),
  105.         ) \
  106.         .dropDuplicates(['client_id', 'timestamp']) \
  107.         .withWatermark('timestamp', '2 minutes')
  108.  
  109.     return df_schemized
  110.  
  111.  
  112. def join(user_df, adv_df) -> DataFrame:
  113.     #
  114.     # Код выходного сообщения возьмите из предыдущего урока (не забудьте обернуть его в `value`).
  115.     #
  116.     # {
  117.     #     "client_id": идентификатор клиента,
  118.     #     "distance": дистанция между клиентом и точкой ресторана,
  119.     #     "adv_campaign_id": идентификатор рекламной акции,
  120.     #     "adv_campaign_name": название рекламной акции,
  121.     #     "adv_campaign_description": описание рекламной акции,
  122.     #     "adv_campaign_start_time": время начала акции,
  123.     #     "adv_campaign_end_time": время окончания акции,
  124.     #     "adv_campaign_point_lat": расположение ресторана/точки широта,
  125.     #     "adv_campaign_point_lon": расположение ресторана/долгота широта,
  126.     #     "created_at": время создания выходного ивента,
  127.     # }
  128.     #
  129.     meters = 1000
  130.     df = user_df \
  131.         .crossJoin(adv_df) \
  132.         .withColumn(
  133.             'distance',
  134.             F.acos(
  135.                 F.sin(adv_df.point_lat) * F.sin(user_df.lat)
  136.                 + F.cos(adv_df.point_lat) * F.cos(user_df.lat) * F.cos(adv_df.point_lon - user_df.lon)
  137.             ) * F.lit(6371)
  138.         ) \
  139.         .filter(F.col("distance") <= meters) \
  140.         .withColumn("created_at", F.current_timestamp()) \
  141.         .withColumn('value', F.to_json(F.struct(
  142.             user_df.client_id,
  143.             F.col('distance'),
  144.             adv_df.id.alias('adv_campaign_id'),
  145.             adv_df.name.alias('adv_campaign_name'),
  146.             adv_df.description.alias('adv_campaign_description'),
  147.             adv_df.start_time.alias('adv_campaign_start_time'),
  148.             adv_df.end_time.alias('adv_campaign_end_time'),
  149.             adv_df.point_lat.alias('adv_campaign_point_lat'),
  150.             adv_df.point_lon.alias('adv_campaign_point_lon'),
  151.             F.col('created_at'),
  152.         ))) \
  153.         .select('value')
  154.  
  155.     return df
  156.  
  157. if __name__ == "__main__":
  158.     spark = spark_init('join stream')
  159.     spark.conf.set("spark.sql.streaming.checkpointLocation", "test_query")
  160.     client_stream = read_client_stream(spark)
  161.     marketing_df = read_marketing(spark)
  162.     result = join(client_stream, marketing_df)
  163.  
  164.     query = (result
  165.              .writeStream
  166.              .outputMode("append")
  167.              .format("kafka")
  168.              .option('kafka.bootstrap.servers', kafka_bootstrap_servers)
  169.              .options(**kafka_security_options)
  170.              .option("topic", TOPIC_NAME_91)
  171.              .trigger(processingTime="15 seconds")
  172.              .option("truncate", False)
  173.              .start())
  174.  
  175.     # while query.isActive:
  176.     #     print(f"query information: runId={query.runId}, "
  177.     #           f"status is {query.status}, "
  178.     #           f"recent progress={query.recentProgress}")
  179.     #     sleep(30)
  180.  
  181.     query.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement