Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime
- from pyspark.sql import SparkSession, DataFrame
- from pyspark.sql import functions as F
- from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType
- spark_jars_packages = ",".join(
- [
- "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
- "org.postgresql:postgresql:42.4.0"
- ]
- )
- def spark_init(test_name) -> SparkSession:
- spark = (
- SparkSession.builder.appName(test_name)
- .config("spark.jars.packages",
- spark_jars_packages).getOrCreate()
- )
- return spark
- DB_HOST = 'rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net'
- DB_PORT = '6432'
- DB_NAME = 'de'
- DB_USER = 'student'
- DB_PASSWORD = 'de-student'
- DB_TABLE = 'marketing_companies'
- def read_marketing(spark: SparkSession) -> DataFrame:
- df = spark.read.format("jdbc").option("url", f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}").option("user", DB_USER).option("password", DB_PASSWORD).option("dbtable", DB_TABLE).option("driver", "org.postgresql.Driver").load()
- return df
- kafka_security_options = {
- 'kafka.security.protocol': 'SASL_SSL',
- 'kafka.sasl.mechanism': 'SCRAM-SHA-512',
- 'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafka-admin\" password=\"de-kafka-admin-2022\";',
- }
- def read_client_stream(spark: SparkSession) -> DataFrame:
- df = (spark.readStream.format('kafka')
- .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
- .option('kafka.security.protocol', 'SASL_SSL')
- .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
- .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";')
- .option('subscribe', 'student.topic.cohort5.larisfdz')
- .load())
- schema_true = StructType([
- StructField("lat", DoubleType()),
- StructField("client_id", StringType()),
- StructField("lon", DoubleType()),
- StructField("timestamp", TimestampType())
- ])
- df_res = (df.withColumn('value', F.col('value').cast(StringType()))
- .withColumn('key', F.col('key').cast(StringType()))
- .withColumn('event', F.from_json(F.col('value'), schema_true))
- .select('event.lat', 'timestamp', 'event.lon', 'event.client_id')
- )
- df_dedup = (df_res.withWatermark(
- 'timestamp', '5 minute').dropDuplicates(['client_id', 'lat', 'lon']))
- return df_dedup
- def join(user_df, marketing_df) -> DataFrame:
- df_joined = user_df.crossJoin(marketing_df)
- '''.alias('marketing_df'))\
- .select('client_id', 'lat', 'lon',
- F.col('marketing_df.id').alias('adv_campaign_id'),
- F.col('marketing_df.name').alias('adv_campaign_name'),
- F.col('marketing_df.description').alias('adv_campaign_description'),
- F.col('marketing_df.start_time').alias('adv_campaign_start_time'),
- F.col('marketing_df.end_time').alias('adv_campaign_end_time'),
- F.col('marketing_df.point_lat').alias('adv_campaign_point_lat'),
- F.col('marketing_df.point_lon').alias('adv_campaign_point_lon'))
- '''
- return df_joined
- if __name__ == "__main__":
- spark = spark_init('testest')
- marketing_df = read_marketing(spark)
- client_stream = read_client_stream(spark)
- result = join(client_stream, marketing_df)
- query = (result
- .writeStream
- .outputMode("append")
- .format("console")
- .option("truncate", False)
- .start())
- query.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement