Advertisement
laris_fdz

Untitled

Jan 31st, 2023
711
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.92 KB | None | 0 0
  1. #!/usr/bin/env python#
  2. from datetime import datetime
  3.  
  4. from pyspark.sql import DataFrame, SparkSession
  5. from pyspark.sql import functions as F
  6. from pyspark.sql.functions import col, from_json, lit, struct, to_json
  7. from pyspark.sql.types import (LongType, StringType, StructField, StructType, TimestampType)
  8.  
  9. topic_in = 'larisfdz_inn'
  10. topic_out = 'larisfdz_outt'
  11. app_name = 'RestaurantSubscribeStreamingService'
  12.  
  13. kafka_security_options = {
  14.     'kafka.security.protocol': 'SASL_SSL',
  15.     'kafka.sasl.mechanism': 'SCRAM-SHA-512',
  16.     'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";',
  17. }
  18. kafka_bootstrap_servers = 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091'
  19.  
  20. postgresql_settings = {
  21.     'user': 'jovyan',
  22.     'password': 'jovyan',
  23.     'url': 'jdbc:postgresql://localhost:5432/de',
  24.     'driver': 'org.postgresql.Driver',
  25.     'dbtable': 'public.subscribers_feedback',
  26.     # 'user': 'student',
  27.     # 'password': 'de-student',
  28. }
  29.  
  30. # необходимые библиотеки для интеграции Spark с Kafka и PostgreSQL
  31. spark_jars_packages = ','.join(
  32.     [
  33.         'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0',
  34.         'org.postgresql:postgresql:42.4.0',
  35.     ]
  36. )
  37.  
  38. # определяем текущее время в UTC в миллисекундах
  39. current_timestamp_utc = int(round(datetime.utcnow().timestamp()))
  40.  
  41. # метод для записи данных в 2 target: в PostgreSQL для фидбэков и в Kafka для триггеров
  42.  
  43. def foreach_batch_function(df, epoch_id):
  44.     # сохраняем df в памяти, чтобы не создавать df заново перед отправкой в Kafka
  45.     df.persist()
  46.  
  47.     # записываем df в PostgreSQL с полем feedback
  48.     df_to_db = (df.withColumn('feedback', lit(None).cast(StringType())))
  49.     # создаём df для отправки в Kafka. Сериализация в json.
  50.     df_to_stream = (df.select(to_json(struct(col('*'))).alias('value')) \
  51.         .select('value'))
  52.  
  53.     df_to_db.write.format('jdbc').mode('append') \
  54.     .options(**postgresql_settings).save()
  55.  
  56.     print('WRITTEN TO DF')
  57.  
  58.     # отправляем сообщения в результирующий топик Kafka без поля feedback
  59.     df_to_stream.write \
  60.         .format('kafka') \
  61.         .option('kafka.bootstrap.servers', kafka_bootstrap_servers) \
  62.         .options(**kafka_security_options) \
  63.         .option('topic', topic_out) \
  64.         .option('truncate', False) \
  65.         .save()
  66.  
  67.     print('WRITTEN TO STREAM')
  68.  
  69.      
  70.    
  71.  
  72.     # очищаем память от df
  73.     df.unpersist()
  74.  
  75. # создаём spark сессию с необходимыми библиотеками в spark_jars_packages для интеграции с Kafka и PostgreSQL
  76.  
  77.  
  78. def spark_init(app_name):
  79.     spark = SparkSession.builder \
  80.         .appName(app_name) \
  81.         .config('spark.sql.session.timeZone', 'UTC') \
  82.         .config('spark.jars.packages', spark_jars_packages) \
  83.         .getOrCreate()
  84.  
  85.     print('SPARK_INIT DONE')
  86.  
  87.     return spark
  88.  
  89. # читаем из топика Kafka сообщения с акциями от ресторанов
  90.  
  91.  
  92. def read_stream(spark):
  93.     df = spark.readStream \
  94.         .format('kafka') \
  95.         .options(**kafka_security_options) \
  96.         .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
  97.         .option('subscribe', topic_in) \
  98.         .load()
  99.     # .option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts') \
  100.     # .option('kafka.ssl.truststore.password', 'changeit')
  101.  
  102.     df_json = df.withColumn('key_str', F.col('key').cast(StringType())) \
  103.         .withColumn('value_json', F.col('value').cast(StringType())) \
  104.         .drop('key', 'value')
  105.  
  106.     # определяем схему входного сообщения для json
  107.     incoming_message_schema = StructType([
  108.         StructField('restaurant_id', StringType(), nullable=True),
  109.         StructField('adv_campaign_id', StringType(), nullable=True),
  110.         StructField('adv_campaign_content', StringType(), nullable=True),
  111.         StructField('adv_campaign_owner', StringType(), nullable=True),
  112.         StructField('adv_campaign_owner_contact', StringType(), nullable=True),
  113.         StructField('adv_campaign_datetime_start', LongType(), nullable=True),
  114.         StructField('adv_campaign_datetime_end', LongType(), nullable=True),
  115.         StructField('datetime_created', LongType(), nullable=True),
  116.     ])
  117.  
  118.     print(f'INCOMING MESSAGE SCHEMA: {incoming_message_schema}')
  119.  
  120.     # десериализуем из value сообщения json и фильтруем по времени старта и окончания акции
  121.     df_string = df_json \
  122.         .withColumn('key', F.col('key_str')) \
  123.         .withColumn('value', F.from_json(F.col('value_json'), incoming_message_schema)) \
  124.         .drop('key_str', 'value_json')
  125.  
  126.     df_filtered = df_string.select(
  127.         F.col('value.restaurant_id').cast(StringType()).alias('restaurant_id'),
  128.         F.col('value.adv_campaign_id').cast(StringType()).alias('adv_campaign_id'),
  129.         F.col('value.adv_campaign_content').cast(StringType()).alias('adv_campaign_content'),
  130.         F.col('value.adv_campaign_owner').cast(StringType()).alias('adv_campaign_owner'),
  131.         F.col('value.adv_campaign_owner_contact').cast(StringType()).alias('adv_campaign_owner_contact'),
  132.         F.col('value.adv_campaign_datetime_start').cast(LongType()).alias('adv_campaign_datetime_start'),
  133.         F.col('value.adv_campaign_datetime_end').cast(LongType()).alias('adv_campaign_datetime_end'),
  134.         F.col('value.datetime_created').cast(LongType()).alias('datetime_created'),
  135.     )\
  136.     .filter((F.col('adv_campaign_datetime_start') <= current_timestamp_utc) & (F.col('adv_campaign_datetime_end') > current_timestamp_utc))
  137.  
  138.     return df_filtered
  139.  
  140. # вычитываем всех пользователей с подпиской на рестораны
  141. def subscribers_restaurants(spark):
  142.     df = spark.read \
  143.         .format('jdbc') \
  144.         .option('url', 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de') \
  145.         .option('driver', 'org.postgresql.Driver') \
  146.         .option('dbtable', 'subscribers_restaurants') \
  147.         .option('user', 'student') \
  148.         .option('password', 'de-student') \
  149.         .load()
  150.     df_dedup = df.dropDuplicates(['client_id', 'restaurant_id'])
  151.     return df_dedup
  152.  
  153. # джойним данные из сообщения Kafka с пользователями подписки по restaurant_id (uuid). Добавляем время создания события.
  154.  
  155. def join(restaurant_read_stream_df, subscribers_restaurant_df):
  156.     df = restaurant_read_stream_df \
  157.         .join(subscribers_restaurant_df, 'restaurant_id') \
  158.         .withColumn('trigger_datetime_created', F.lit(current_timestamp_utc))\
  159.         .select(
  160.             'restaurant_id',
  161.             'adv_campaign_id',
  162.             'adv_campaign_content',
  163.             'adv_campaign_owner',
  164.             'adv_campaign_owner_contact',
  165.             'adv_campaign_datetime_start',
  166.             'adv_campaign_datetime_end',
  167.             'datetime_created',
  168.             'client_id',
  169.             'trigger_datetime_created')
  170.     return df
  171. # запускаем стриминг
  172.  
  173.  
  174. if __name__ == '__main__':
  175.     spark = spark_init(app_name)
  176.     spark.conf.set('spark.sql.streaming.checkpointLocation', 'test_query')
  177.     #client_stream = read_client_stream(spark)
  178.     restaurant_read_stream_df = read_stream(spark)
  179.     subscribers_restaurant_df = subscribers_restaurants(spark)
  180.     result = join(restaurant_read_stream_df, subscribers_restaurant_df)
  181.  
  182.     query = (result.writeStream.outputMode("append").format("console").option("truncate", False).start())
  183.    
  184.     query.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement