Advertisement
laris_fdz

Untitled

Feb 2nd, 2023
948
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.74 KB | None | 0 0
  1. from datetime import datetime
  2.  
  3. from pyspark.sql import SparkSession, DataFrame
  4. from pyspark.sql import functions as F
  5. from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType
  6.  
  7. spark_jars_packages = ",".join(
  8.         [
  9.             "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
  10.             "org.postgresql:postgresql:42.4.0"
  11.         ]
  12.     )
  13.  
  14. def spark_init(test_name) -> SparkSession:
  15.     spark = (
  16.         SparkSession.builder.appName(test_name)
  17.         .config("spark.jars.packages",
  18.         spark_jars_packages).getOrCreate()
  19.     )
  20.     return spark
  21.  
  22. DB_HOST = 'rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net'
  23. DB_PORT = '6432'
  24. DB_NAME = 'de'
  25. DB_USER = 'student'
  26. DB_PASSWORD = 'de-student'
  27. DB_TABLE = 'marketing_companies'
  28.  
  29. def read_marketing(spark: SparkSession) -> DataFrame:
  30.     df = spark.read.format("jdbc").option("url", f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}").option("user", DB_USER).option("password", DB_PASSWORD).option("dbtable", DB_TABLE).option("driver", "org.postgresql.Driver").load()
  31.     return df
  32.  
  33. kafka_security_options = {
  34.     'kafka.security.protocol': 'SASL_SSL',
  35.     'kafka.sasl.mechanism': 'SCRAM-SHA-512',
  36.     'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafka-admin\" password=\"de-kafka-admin-2022\";',
  37. }
  38.  
  39. def read_client_stream(spark: SparkSession) -> DataFrame:
  40.     df = (spark.readStream.format('kafka')
  41.           .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
  42.           .option('kafka.security.protocol', 'SASL_SSL')
  43.           .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
  44.           .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";')
  45.           .option('subscribe', 'student.topic.cohort5.larisfdz')
  46.           .load())
  47.  
  48.     schema_true = StructType([
  49.         StructField("lat", DoubleType()),
  50.         StructField("client_id", StringType()),
  51.         StructField("lon", DoubleType()),
  52.         StructField("timestamp", TimestampType())
  53.     ])
  54.     df_res = (df.withColumn('value', F.col('value').cast(StringType()))
  55.               .withColumn('key', F.col('key').cast(StringType()))
  56.               .withColumn('event', F.from_json(F.col('value'), schema_true))
  57.                 .select('event.lat', 'timestamp', 'event.lon', 'event.client_id')
  58.               )
  59.     df_dedup = (df_res.withWatermark(
  60.         'timestamp', '5 minute').dropDuplicates(['client_id', 'lat', 'lon']))
  61.  
  62.     return df_dedup
  63.  
  64.  
  65. def join(user_df, marketing_df) -> DataFrame:
  66.     df_joined = user_df.crossJoin(marketing_df)
  67.     '''.alias('marketing_df'))\
  68.        .select('client_id', 'lat', 'lon',
  69.                F.col('marketing_df.id').alias('adv_campaign_id'),
  70.                F.col('marketing_df.name').alias('adv_campaign_name'),
  71.                F.col('marketing_df.description').alias('adv_campaign_description'),
  72.                F.col('marketing_df.start_time').alias('adv_campaign_start_time'),
  73.                F.col('marketing_df.end_time').alias('adv_campaign_end_time'),
  74.                F.col('marketing_df.point_lat').alias('adv_campaign_point_lat'),
  75.                F.col('marketing_df.point_lon').alias('adv_campaign_point_lon'))
  76.                '''
  77.     return df_joined
  78.  
  79. if __name__ == "__main__":
  80.     spark = spark_init('testest')
  81.     marketing_df = read_marketing(spark)
  82.     client_stream = read_client_stream(spark)
  83.     result = join(client_stream, marketing_df)
  84.  
  85.     query = (result
  86.              .writeStream
  87.              .outputMode("append")
  88.              .format("console")
  89.              .option("truncate", False)
  90.              .start())
  91.     query.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement