Advertisement
laris_fdz

Untitled

Jan 30th, 2023
2,033
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 7.94 KB | None | 0 0
  1. #!/usr/bin/env python#
  2. from datetime import datetime
  3.  
  4. from pyspark.sql import DataFrame, SparkSession
  5. from pyspark.sql import functions as F
  6. from pyspark.sql.functions import col, from_json, lit, struct, to_json
  7. from pyspark.sql.types import (LongType, StringType, StructField, StructType, LongType
  8.                                TimestampType)
  9.  
  10. topic_in = 'larisfdz_in'
  11. topic_out = 'larisfdz_out'
  12. app_name = 'RestaurantSubscribeStreamingService'
  13.  
  14. kafka_security_options = {
  15.     'kafka.security.protocol': 'SASL_SSL',
  16.     'kafka.sasl.mechanism': 'SCRAM-SHA-512',
  17.     'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";',
  18. }
  19. kafka_bootstrap_servers = 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091'
  20.  
  21. postgresql_settings = {
  22.     'user': 'jovyan',
  23.     'password': 'jovyan',
  24.     'url': 'jdbc:postgresql://localhost:5432/de',
  25.     'driver': 'org.postgresql.Driver',
  26.     'dbtable': 'public.subscribers_feedback',
  27.     # 'user': 'student',
  28.     # 'password': 'de-student',
  29. }
  30.  
  31. # необходимые библиотеки для интеграции Spark с Kafka и PostgreSQL
  32. spark_jars_packages = ','.join(
  33.     [
  34.         'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0',
  35.         'org.postgresql:postgresql:42.4.0',
  36.     ]
  37. )
  38.  
  39. # определяем текущее время в UTC в миллисекундах
  40. current_timestamp_utc = long(round(datetime.utcnow().timestamp()))
  41.  
  42. # метод для записи данных в 2 target: в PostgreSQL для фидбэков и в Kafka для триггеров
  43.  
  44. def foreach_batch_function(df, epoch_id):
  45.     # сохраняем df в памяти, чтобы не создавать df заново перед отправкой в Kafka
  46.     df.persist()
  47.  
  48.     # записываем df в PostgreSQL с полем feedback
  49.     df_to_db = (df.withColumn('feedback', lit(None).cast(StringType())))
  50.     # создаём df для отправки в Kafka. Сериализация в json.
  51.     df_to_stream = (df.select(to_json(struct(col('*'))).alias('value')) \
  52.         .select('value'))
  53.  
  54.     df_to_db.write.format('jdbc').mode('append') \
  55.     .options(**postgresql_settings).save()
  56.  
  57.     print('WRITTEN TO DF')
  58.  
  59.     # отправляем сообщения в результирующий топик Kafka без поля feedback
  60.     df_to_stream.write \
  61.         .format('kafka') \
  62.         .option('kafka.bootstrap.servers', kafka_bootstrap_servers) \
  63.         .options(**kafka_security_options) \
  64.         .option('topic', topic_out) \
  65.         .option('truncate', False) \
  66.         .save()
  67.  
  68.     print('WRITTEN TO STREAM')
  69.  
  70.      
  71.    
  72.  
  73.     # очищаем память от df
  74.     df.unpersist()
  75.  
  76. # создаём spark сессию с необходимыми библиотеками в spark_jars_packages для интеграции с Kafka и PostgreSQL
  77.  
  78.  
  79. def spark_init(app_name):
  80.     spark = SparkSession.builder \
  81.         .appName(app_name) \
  82.         .config('spark.sql.session.timeZone', 'UTC') \
  83.         .config('spark.jars.packages', spark_jars_packages) \
  84.         .getOrCreate()
  85.  
  86.     print('SPARK_INIT DONE')
  87.  
  88.     return spark
  89.  
  90. # читаем из топика Kafka сообщения с акциями от ресторанов
  91.  
  92.  
  93. def read_stream(spark):
  94.     df = spark.readStream \
  95.         .format('kafka') \
  96.         .options(**kafka_security_options) \
  97.         .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
  98.         .option('subscribe', topic_in) \
  99.         .load()
  100.     # .option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts') \
  101.     # .option('kafka.ssl.truststore.password', 'changeit')
  102.  
  103.     df_json = df.withColumn('key_str', F.col('key').cast(StringType())) \
  104.         .withColumn('value_json', F.col('value').cast(StringType())) \
  105.         .drop('key', 'value')
  106.  
  107.     # определяем схему входного сообщения для json
  108.     incoming_message_schema = StructType([
  109.         StructField('restaurant_id', StringType(), nullable=True),
  110.         StructField('adv_campaign_id', StringType(), nullable=True),
  111.         StructField('adv_campaign_content', StringType(), nullable=True),
  112.         StructField('adv_campaign_owner', StringType(), nullable=True),
  113.         StructField('adv_campaign_owner_contact', StringType(), nullable=True),
  114.         StructField('adv_campaign_datetime_start', LongType(), nullable=True),
  115.         StructField('adv_campaign_datetime_end', LongType(), nullable=True),
  116.         StructField('datetime_created', LongType(), nullable=True),
  117.     ])
  118.  
  119.     print(f'INCOMING MESSAGE SCHEMA: {incoming_message_schema}')
  120.  
  121.     # десериализуем из value сообщения json и фильтруем по времени старта и окончания акции
  122.     df_string = df_json \
  123.         .withColumn('key', F.col('key_str')) \
  124.         .withColumn('value', F.from_json(F.col('value_json'), incoming_message_schema)) \
  125.         .drop('key_str', 'value_json')
  126.  
  127.     df_filtered = df_string.select(
  128.         F.col('value.restaurant_id').cast(StringType()).alias('restaurant_id'),
  129.         F.col('value.adv_campaign_id').cast(StringType()).alias('adv_campaign_id'),
  130.         F.col('value.adv_campaign_content').cast(StringType()).alias('adv_campaign_content'),
  131.         F.col('value.adv_campaign_owner').cast(StringType()).alias('adv_campaign_owner'),
  132.         F.col('value.adv_campaign_owner_contact').cast(StringType()).alias('adv_campaign_owner_contact'),
  133.         F.col('value.adv_campaign_datetime_start').cast(LongType()).alias('adv_campaign_datetime_start'),
  134.         F.col('value.adv_campaign_datetime_end').cast(LongType()).alias('adv_campaign_datetime_end'),
  135.         F.col('value.datetime_created').cast(LongType()).alias('datetime_created'),
  136.     )\
  137.     .filter((F.col('adv_campaign_datetime_start') <= current_timestamp_utc) & (F.col('adv_campaign_datetime_end') > current_timestamp_utc))
  138.  
  139.     return df_filtered
  140.  
  141. # вычитываем всех пользователей с подпиской на рестораны
  142. def subscribers_restaurants(spark):
  143.     df = spark.read \
  144.         .format('jdbc') \
  145.         .option('url', 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de') \
  146.         .option('driver', 'org.postgresql.Driver') \
  147.         .option('dbtable', 'subscribers_restaurants') \
  148.         .option('user', 'student') \
  149.         .option('password', 'de-student') \
  150.         .load()
  151.     df_dedup = df.dropDuplicates(['client_id', 'restaurant_id'])
  152.     return df_dedup
  153.  
  154. # джойним данные из сообщения Kafka с пользователями подписки по restaurant_id (uuid). Добавляем время создания события.
  155.  
  156. def join(restaurant_read_stream_df, subscribers_restaurant_df):
  157.     df = restaurant_read_stream_df \
  158.         .join(subscribers_restaurant_df, 'restaurant_id') \
  159.         .withColumn('trigger_datetime_created', F.lit(current_timestamp_utc))\
  160.         .select(
  161.             'restaurant_id',
  162.             'adv_campaign_id',
  163.             'adv_campaign_content',
  164.             'adv_campaign_owner',
  165.             'adv_campaign_owner_contact',
  166.             'adv_campaign_datetime_start',
  167.             'adv_campaign_datetime_end',
  168.             'datetime_created',
  169.             'client_id',
  170.             'trigger_datetime_created')
  171.     return df
  172. # запускаем стриминг
  173.  
  174.  
  175. if __name__ == '__main__':
  176.     spark = spark_init(app_name)
  177.     spark.conf.set('spark.sql.streaming.checkpointLocation', 'test_query')
  178.     #client_stream = read_client_stream(spark)
  179.     restaurant_read_stream_df = read_stream(spark)
  180.     subscribers_restaurant_df = subscribers_restaurants(spark)
  181.     result = join(restaurant_read_stream_df, subscribers_restaurant_df)
  182.  
  183.     result.writeStream \
  184.     .foreachBatch(foreach_batch_function) \
  185.     .start() \
  186.     .awaitTermination(10)
  187.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement