Advertisement
laris_fdz

Untitled

Feb 2nd, 2023
697
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.81 KB | None | 0 0
  1. from pyspark.sql import SparkSession, DataFrame
  2. from pyspark.sql import functions as F
  3. from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType
  4.  
  5. # необходимая библиотека с идентификатором в maven
  6. # вы можете использовать ее с помощью метода .config и опции "spark.jars.packages"
  7. kafka_lib_id = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
  8.  
  9. # настройки security для кафки
  10. # вы можете использовать из с помощью метода .options(**kafka_security_options)
  11. kafka_security_options = {
  12.     'kafka.security.protocol': 'SASL_SSL',
  13.     'kafka.sasl.mechanism': 'SCRAM-SHA-512',
  14.     'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";',
  15. }
  16.  
  17. def spark_init() -> SparkSession:
  18.     spark = SparkSession.builder\
  19.         .master('local')\
  20.         .appName('test connect to kafka')\
  21.         .config('spark.jars.packages', kafka_lib_id)\
  22.         .getOrCreate()
  23.     return spark
  24.  
  25. def load_df(spark: SparkSession) -> DataFrame:
  26.     df = (spark.readStream.format('kafka')
  27.           .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
  28.           .option('kafka.security.protocol', 'SASL_SSL')
  29.           .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
  30.           .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";')
  31.           .option('subscribe', 'student.topic.cohort5.larisfdz')
  32.           .load())
  33.     return df
  34.     # не забудьте здесь указать то-же имя топика что и в TOPIC_NAME
  35.  
  36. def transform(df: DataFrame) -> DataFrame:
  37.     #df_des = df.withColumn('value_str', df.value.cast(StringType()))
  38.     #schema = F.schema_of_json(df_des.select('value_str').head()[0])
  39.  
  40.     schema_true = StructType([
  41.             StructField("lat", DoubleType()),
  42.             StructField("client_id", StringType()),
  43.             StructField("lon", DoubleType()),
  44.             StructField("timestamp", TimestampType())
  45.             ])
  46.     df_res = (df.withColumn('value', F.col('value').cast(StringType()))\
  47.         .withColumn('key', F.col('key').cast(StringType()))\
  48.             .withColumn('event', F.from_json(F.col('value'), schema_true))\
  49.                 .select('event.lat', 'event.timestamp', 'event.lon', 'event.client_id')
  50.         )
  51.     return df_res
  52.  
  53. spark = spark_init()
  54.  
  55. source_df = load_df(spark)
  56. output_df = transform(source_df)
  57.  
  58. query = (output_df.writeStream.outputMode("append").format("console").option("truncate", False).trigger(once=True).start())
  59.  
  60. try:
  61.     query.awaitTermination()
  62. finally:
  63.     query.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement