Advertisement
laris_fdz

Untitled

Jan 28th, 2023
767
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.77 KB | None | 0 0
  1. from datetime import datetime
  2. from time import sleep
  3.  
  4. from pyspark.sql import SparkSession, DataFrame
  5. from pyspark.sql import functions as f
  6. from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType, LongType
  7.  
  8. TOPIC_NAME_91 = 'student.topic.cohort5.larisfdz.out'
  9.  
  10. spark_master = 'local'
  11. spark_app_name = "coh5_larisfdz_s8-3-9-1"
  12. spark_jars_packages = ",".join([
  13.     "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
  14.     "org.postgresql:postgresql:42.4.0",
  15. ])
  16.  
  17. postgresql_settings = {
  18.     'user': 'master',
  19.     'password': 'de-master-password',
  20.     'url': 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de',
  21.     'driver': 'org.postgresql.Driver',
  22.     'dbtable': 'marketing_companies',
  23.     # 'user': 'student',
  24.     # 'password': 'de-student',
  25. }
  26.  
  27. kafka_security_options = {
  28.     'kafka.security.protocol': 'SASL_SSL',
  29.     'kafka.sasl.mechanism': 'SCRAM-SHA-512',
  30.     'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";',
  31. }
  32. kafka_bootstrap_servers = 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091'
  33.  
  34.  
  35. def spark_init(app_name) -> SparkSession:
  36.     spark_session = SparkSession.builder \
  37.         .master(spark_master) \
  38.         .appName(app_name) \
  39.         .config("spark.jars.packages", spark_jars_packages) \
  40.         .getOrCreate()
  41.  
  42.     return spark_session
  43.  
  44.  
  45. def read_marketing(spark_session: SparkSession) -> DataFrame:
  46.     # - Схема DataFrame ресторанных акций:
  47.     #
  48.     # >> adv_campaign_df.printSchema()
  49.     # root
  50.     #  |-- id: string (nullable = true) - идентификатор акции
  51.     #  |-- name: string (nullable = true) - наименование акции
  52.     #  |-- point_lat: double (nullable = true) - широта точки/ресторана
  53.     #  |-- point_lon: double (nullable = true) - долгота точки/ресторана
  54.     #  |-- description: string (nullable = true) - описание акции
  55.     #  |-- start_time: timestamp (nullable = true) - время начала акции
  56.     #  |-- end_time: timestamp (nullable = true) - время конца акции
  57.     #  |-- radius: short (nullable = true) - возможное расстояние до клиента
  58.     df = spark_session.read \
  59.         .format('jdbc') \
  60.         .options(**postgresql_settings) \
  61.         .load()
  62.     return df
  63.  
  64.  
  65. def read_client_stream(spark_session: SparkSession) -> DataFrame:
  66.     # Не забудьте дедуплицировать данные, когда будете читать клиентский стрим.
  67.     #
  68.     # - Схема DataFrame клиентского стрима:
  69.     #
  70.     # >> client_stream_df.printSchema()
  71.     # root
  72.     #  |-- client_id: string (nullable = true) - идентификатор клиента
  73.     #  |-- timestamp: timestamp (nullable = true) - время
  74.     #  |-- lat: double (nullable = true) - положение клиента широта
  75.     #  |-- lon: double (nullable = true) - положение клиента долгота
  76.     df_src = spark_session.readStream.format('kafka') \
  77.         .options(**kafka_security_options) \
  78.         .option('kafka.bootstrap.servers', kafka_bootstrap_servers) \
  79.         .option('subscribe', 'student.topic.cohort5.larisfdz') \
  80.         .load()
  81.  
  82.     df_jsoned = df_src \
  83.         .withColumn('key_str', f.col('key').cast(StringType())) \
  84.         .withColumn('value_json', f.col('value').cast(StringType())) \
  85.         .drop('key', 'value')
  86.  
  87.     # schema_of_json more soft than tz
  88.     # schema4value = f.schema_of_json(df_jsoned.select('value_json').head()[0])
  89.  
  90.     schema4value = StructType([
  91.         StructField("client_id", StringType(), nullable=True),
  92.         StructField("timestamp", TimestampType(), nullable=True),
  93.         StructField("lat", DoubleType(), nullable=True),
  94.         StructField("lon", DoubleType(), nullable=True),
  95.     ])
  96.  
  97.     df_stringed = df_jsoned \
  98.         .withColumn('key', f.col('key_str')) \
  99.         .withColumn('value', f.from_json(f.col('value_json'), schema4value)) \
  100.         .drop('key_str', 'value_json')
  101.  
  102.     df_schemized = df_stringed \
  103.         .select(
  104.             f.col('value.client_id').cast(StringType()).alias('client_id'),
  105.             f.col('value.timestamp').cast(TimestampType()).alias('timestamp'),
  106.             f.col('value.lat').cast(DoubleType()).alias('lat'),
  107.             f.col('value.lon').cast(DoubleType()).alias('lon'),
  108.             f.col('offset').cast(LongType()),
  109.         ) \
  110.         .dropDuplicates(['client_id', 'timestamp']) \
  111.         .withWatermark('timestamp', '2 minutes')
  112.  
  113.     return df_schemized
  114.  
  115.  
  116. def join(user_df, adv_df) -> DataFrame:
  117.     #
  118.     # Код выходного сообщения возьмите из предыдущего урока (не забудьте обернуть его в `value`).
  119.     #
  120.     # {
  121.     #     "client_id": идентификатор клиента,
  122.     #     "distance": дистанция между клиентом и точкой ресторана,
  123.     #     "adv_campaign_id": идентификатор рекламной акции,
  124.     #     "adv_campaign_name": название рекламной акции,
  125.     #     "adv_campaign_description": описание рекламной акции,
  126.     #     "adv_campaign_start_time": время начала акции,
  127.     #     "adv_campaign_end_time": время окончания акции,
  128.     #     "adv_campaign_point_lat": расположение ресторана/точки широта,
  129.     #     "adv_campaign_point_lon": расположение ресторана/долгота широта,
  130.     #     "created_at": время создания выходного ивента,
  131.     # }
  132.     #
  133.     kilometers = 1
  134.     meters = 1000
  135.     df = user_df \
  136.         .crossJoin(adv_df) \
  137.         .withColumn(
  138.             'distance',
  139.             f.acos(
  140.                 f.sin(adv_df.point_lat) * f.sin(user_df.lat)
  141.                 + f.cos(adv_df.point_lat) * f.cos(user_df.lat) * f.cos(adv_df.point_lon - user_df.lon)
  142.             ) * f.lit(6371)
  143.         ) \
  144.         .filter(f.col("distance") <= meters) \
  145.         .withColumn("created_at", f.current_timestamp()) \
  146.         .withColumn('value', f.to_json(f.struct(
  147.             user_df.client_id,
  148.             f.col('distance'),
  149.             adv_df.id.alias('adv_campaign_id'),
  150.             adv_df.name.alias('adv_campaign_name'),
  151.             adv_df.description.alias('adv_campaign_description'),
  152.             adv_df.start_time.alias('adv_campaign_start_time'),
  153.             adv_df.end_time.alias('adv_campaign_end_time'),
  154.             adv_df.point_lat.alias('adv_campaign_point_lat'),
  155.             adv_df.point_lon.alias('adv_campaign_point_lon'),
  156.             f.col('created_at'),
  157.         ))) \
  158.         .select('value')
  159.  
  160.     return df
  161.  
  162. if __name__ == "__main__":
  163.     spark = spark_init('join stream')
  164.     spark.conf.set("spark.sql.streaming.checkpointLocation", "test_query")
  165.     client_stream = read_client_stream(spark)
  166.     marketing_df = read_marketing(spark)
  167.     result = join(client_stream, marketing_df)
  168.  
  169.     query = (result
  170.              .writeStream
  171.              .outputMode("append")
  172.              .format("kafka")
  173.              .option('kafka.bootstrap.servers', kafka_bootstrap_servers)
  174.              .options(**kafka_security_options)
  175.              .option("topic", TOPIC_NAME_91)
  176.              .trigger(processingTime="15 seconds")
  177.              .option("truncate", False)
  178.              .start())
  179.  
  180.     # while query.isActive:
  181.     #     print(f"query information: runId={query.runId}, "
  182.     #           f"status is {query.status}, "
  183.     #           f"recent progress={query.recentProgress}")
  184.     #     sleep(30)
  185.  
  186.     query.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement