Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark import SparkContext
- from pyspark.sql import SQLContext, SparkSession
- from pyspark.sql import SparkSession
- from pyspark.streaming import StreamingContext
- from pyspark.streaming.kafka import KafkaUtils
- spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
- sc = spark.sparkContext
- sqlc = SQLContext(sc)
- hosts = "host1:9092,host2:9092,host3:9092"
- topic = "myTopic"
- securityProtocol = "SASL_PLAINTEXT"
- saslMechanism = "PLAIN"
- try:
- df = sqlc
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", hosts)
- .option("kafka.security.protocol", securityProtocol)
- .option("kafka.sasl.mechanism", saslMechanism)
- .option("startingOffsets", "earliest")
- .option("subscribe", topic)
- .load()
- dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .writeStream.outputMode('append')
- .format("console")
- .start()
- dss.awaitTermination()
- except KeyboardInterrupt:
- print 'shutting down...'
- KafkaClient {
- org.apache.kafka.common.security.plain.PlainLoginModule required
- username="user1"
- password="sssshhhh"
- serviceName="kafka";
- };
- spark-submit
- --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1
- --files "kafka.jaas"
- --driver-java-options "-Djava.security.auth.login.config=kafka.jaas"
- --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas"
- "./consumer.py"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement