Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime
- from time import sleep
- from pyspark.sql import SparkSession, DataFrame
- from pyspark.sql import functions as F
- from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType, LongType
- TOPIC_NAME_91 = 'student.topic.cohort5.larisfdz.out'
- spark_master = 'local'
- spark_app_name = "coh5_larisfdz_s8-3-9-1"
- spark_jars_packages = ",".join([
- "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
- "org.postgresql:postgresql:42.4.0",
- ])
- postgresql_settings = {
- 'user': 'master',
- 'password': 'de-master-password',
- 'url': 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de',
- 'driver': 'org.postgresql.Driver',
- 'dbtable': 'marketing_companies',
- # 'user': 'student',
- # 'password': 'de-student',
- }
- kafka_security_options = {
- 'kafka.security.protocol': 'SASL_SSL',
- 'kafka.sasl.mechanism': 'SCRAM-SHA-512',
- 'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";',
- }
- kafka_bootstrap_servers = 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091'
- def spark_init(app_name) -> SparkSession:
- spark_session = SparkSession.builder \
- .master(spark_master) \
- .appName(app_name) \
- .config("spark.jars.packages", spark_jars_packages) \
- .getOrCreate()
- return spark_session
- def read_marketing(spark_session: SparkSession) -> DataFrame:
- # - Схема DataFrame ресторанных акций:
- #
- # >> adv_campaign_df.printSchema()
- # root
- # |-- id: string (nullable = true) - идентификатор акции
- # |-- name: string (nullable = true) - наименование акции
- # |-- point_lat: double (nullable = true) - широта точки/ресторана
- # |-- point_lon: double (nullable = true) - долгота точки/ресторана
- # |-- description: string (nullable = true) - описание акции
- # |-- start_time: timestamp (nullable = true) - время начала акции
- # |-- end_time: timestamp (nullable = true) - время конца акции
- # |-- radius: short (nullable = true) - возможное расстояние до клиента
- df = spark_session.read \
- .format('jdbc') \
- .options(**postgresql_settings) \
- .load()
- return df
- def read_client_stream(spark_session: SparkSession) -> DataFrame:
- # Не забудьте дедуплицировать данные, когда будете читать клиентский стрим.
- #
- # - Схема DataFrame клиентского стрима:
- #
- # >> client_stream_df.printSchema()
- df_src = spark_session.readStream.format('kafka') \
- .options(**kafka_security_options) \
- .option('kafka.bootstrap.servers', kafka_bootstrap_servers) \
- .option('subscribe', 'student.topic.cohort5.larisfdz') \
- .load()
- df_jsoned = df_src \
- .withColumn('key_str', f.col('key').cast(StringType())) \
- .withColumn('value_json', f.col('value').cast(StringType())) \
- .drop('key', 'value')
- # schema_of_json more soft than tz
- # schema4value = f.schema_of_json(df_jsoned.select('value_json').head()[0])
- schema4value = StructType([
- StructField("client_id", StringType(), nullable=True),
- StructField("timestamp", TimestampType(), nullable=True),
- StructField("lat", DoubleType(), nullable=True),
- StructField("lon", DoubleType(), nullable=True),
- ])
- df_stringed = df_jsoned \
- .withColumn('key', F.col('key_str')) \
- .withColumn('value', F.from_json(f.col('value_json'), schema4value)) \
- .drop('key_str', 'value_json')
- df_schemized = df_stringed \
- .select(
- F.col('value.client_id').cast(StringType()).alias('client_id'),
- F.col('value.timestamp').cast(TimestampType()).alias('timestamp'),
- F.col('value.lat').cast(DoubleType()).alias('lat'),
- F.col('value.lon').cast(DoubleType()).alias('lon'),
- F.col('offset').cast(LongType()),
- ) \
- .dropDuplicates(['client_id', 'timestamp']) \
- .withWatermark('timestamp', '2 minutes')
- return df_schemized
- def join(user_df, adv_df) -> DataFrame:
- #
- # Код выходного сообщения возьмите из предыдущего урока (не забудьте обернуть его в `value`).
- #
- # {
- # "client_id": идентификатор клиента,
- # "distance": дистанция между клиентом и точкой ресторана,
- # "adv_campaign_id": идентификатор рекламной акции,
- # "adv_campaign_name": название рекламной акции,
- # "adv_campaign_description": описание рекламной акции,
- # "adv_campaign_start_time": время начала акции,
- # "adv_campaign_end_time": время окончания акции,
- # "adv_campaign_point_lat": расположение ресторана/точки широта,
- # "adv_campaign_point_lon": расположение ресторана/долгота широта,
- # "created_at": время создания выходного ивента,
- # }
- #
- meters = 1000
- df = user_df \
- .crossJoin(adv_df) \
- .withColumn(
- 'distance',
- F.acos(
- F.sin(adv_df.point_lat) * F.sin(user_df.lat)
- + F.cos(adv_df.point_lat) * F.cos(user_df.lat) * F.cos(adv_df.point_lon - user_df.lon)
- ) * F.lit(6371)
- ) \
- .filter(F.col("distance") <= meters) \
- .withColumn("created_at", F.current_timestamp()) \
- .withColumn('value', F.to_json(F.struct(
- user_df.client_id,
- F.col('distance'),
- adv_df.id.alias('adv_campaign_id'),
- adv_df.name.alias('adv_campaign_name'),
- adv_df.description.alias('adv_campaign_description'),
- adv_df.start_time.alias('adv_campaign_start_time'),
- adv_df.end_time.alias('adv_campaign_end_time'),
- adv_df.point_lat.alias('adv_campaign_point_lat'),
- adv_df.point_lon.alias('adv_campaign_point_lon'),
- F.col('created_at'),
- ))) \
- .select('value')
- return df
- if __name__ == "__main__":
- spark = spark_init('join stream')
- spark.conf.set("spark.sql.streaming.checkpointLocation", "test_query")
- client_stream = read_client_stream(spark)
- marketing_df = read_marketing(spark)
- result = join(client_stream, marketing_df)
- query = (result
- .writeStream
- .outputMode("append")
- .format("kafka")
- .option('kafka.bootstrap.servers', kafka_bootstrap_servers)
- .options(**kafka_security_options)
- .option("topic", TOPIC_NAME_91)
- .trigger(processingTime="15 seconds")
- .option("truncate", False)
- .start())
- # while query.isActive:
- # print(f"query information: runId={query.runId}, "
- # f"status is {query.status}, "
- # f"recent progress={query.recentProgress}")
- # sleep(30)
- query.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement