Advertisement
cawachikago

Stream

May 30th, 2023 (edited)
509
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
SPARK 2.09 KB | Fixit | 0 0
  1. from pyspark.sql import SparkSession
  2. from pyspark.sql import functions as f
  3. from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
  4.  
  5.  
  6. spark_jars_packages = ",".join(
  7.     [
  8.         "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
  9.         "org.postgresql:postgresql:42.4.0"
  10.     ]
  11. )
  12.  
  13. spark = (
  14.     SparkSession.builder
  15.     .master("local")
  16.     .appName('test connect to kafka')
  17.     .config("spark.jars.packages", spark_jars_packages)
  18.     .getOrCreate()
  19. )
  20.  
  21. truststore_location = "/etc/security/ssl"
  22. truststore_pass = "de_sprint_8"
  23.  
  24. df = (spark.readStream
  25.       .format('kafka')
  26.       .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
  27.       .option('kafka.security.protocol', 'SASL_SSL')
  28.       .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
  29.       .option('kafka.sasl.jaas.config','org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";')
  30.      #.option('kafka.ssl.truststore.location', truststore_location)
  31.      #.option('kafka.ssl.truststore.password', truststore_pass)
  32.      .option("subscribe", "cawachi_in")
  33.      .load())
  34.  
  35. schema = StructType([
  36.    StructField("restaurant_id", StringType()),
  37.    StructField("adv_campaign_id", StringType()),
  38.    StructField("adv_campaign_content", StringType()),
  39.    StructField("adv_campaign_owner", StringType()),
  40.    StructField("adv_campaign_owner_contact", StringType()),
  41.    StructField("adv_campaign_datetime_start", DoubleType()),
  42.    StructField("adv_campaign_datetime_end", DoubleType()),
  43.    StructField("datetime_created", DoubleType())
  44. ])
  45.  
  46. df2 = (df
  47.      .withColumn('value', f.col('value').cast(StringType()))
  48.      .withColumn('key', f.col('key').cast(StringType()))
  49.      .withColumn('event', f.from_json(f.col('value'), schema))
  50.      .selectExpr('event.*', '*').drop('event').drop('value')
  51.      )
  52.  
  53. df2.printSchema()
  54. df2.writeStream \
  55.         .outputMode("append") \
  56.         .format("console") \
  57.         .option("truncate", False) \
  58.         .trigger(once=True) \
  59.         .start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement