Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark.sql import SparkSession, DataFrame
- from pyspark.sql import functions as F
- from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType
- # необходимая библиотека с идентификатором в maven
- # вы можете использовать ее с помощью метода .config и опции "spark.jars.packages"
- kafka_lib_id = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0"
- # настройки security для кафки
- # вы можете использовать из с помощью метода .options(**kafka_security_options)
- kafka_security_options = {
- 'kafka.security.protocol': 'SASL_SSL',
- 'kafka.sasl.mechanism': 'SCRAM-SHA-512',
- 'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";',
- }
- def spark_init() -> SparkSession:
- spark = SparkSession.builder\
- .master('local')\
- .appName('test connect to kafka')\
- .config('spark.jars.packages', kafka_lib_id)\
- .getOrCreate()
- return spark
- def load_df(spark: SparkSession) -> DataFrame:
- df = (spark.readStream.format('kafka')
- .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
- .option('kafka.security.protocol', 'SASL_SSL')
- .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
- .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";')
- .option('subscribe', 'student.topic.cohort5.larisfdz')
- .load())
- return df
- # не забудьте здесь указать то-же имя топика что и в TOPIC_NAME
- def transform(df: DataFrame) -> DataFrame:
- #df_des = df.withColumn('value_str', df.value.cast(StringType()))
- #schema = F.schema_of_json(df_des.select('value_str').head()[0])
- schema_true = StructType([
- StructField("lat", DoubleType()),
- StructField("client_id", StringType()),
- StructField("lon", DoubleType()),
- StructField("timestamp", TimestampType())
- ])
- df_res = (df.withColumn('value', F.col('value').cast(StringType()))\
- .withColumn('key', F.col('key').cast(StringType()))\
- .withColumn('event', F.from_json(F.col('value'), schema_true))\
- .select('event.lat', 'event.timestamp', 'event.lon', 'event.client_id')
- )
- return df_res
- spark = spark_init()
- source_df = load_df(spark)
- output_df = transform(source_df)
- query = (output_df.writeStream.outputMode("append").format("console").option("truncate", False).trigger(once=True).start())
- try:
- query.awaitTermination()
- finally:
- query.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement