Advertisement
Guest User

Untitled

a guest
Jun 24th, 2019
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.42 KB | None | 0 0
  1. from pyspark import SparkContext
  2. from pyspark.sql import SQLContext, SparkSession
  3.  
  4. from pyspark.sql import SparkSession
  5. from pyspark.streaming import StreamingContext
  6. from pyspark.streaming.kafka import KafkaUtils
  7.  
  8. spark = SparkSession.builder.appName("KafkaConsumer").getOrCreate()
  9. sc = spark.sparkContext
  10. sqlc = SQLContext(sc)
  11.  
  12. hosts = "host1:9092,host2:9092,host3:9092"
  13. topic = "myTopic"
  14. securityProtocol = "SASL_PLAINTEXT"
  15. saslMechanism = "PLAIN"
  16.  
  17. try:
  18. df = sqlc
  19. .readStream
  20. .format("kafka")
  21. .option("kafka.bootstrap.servers", hosts)
  22. .option("kafka.security.protocol", securityProtocol)
  23. .option("kafka.sasl.mechanism", saslMechanism)
  24. .option("startingOffsets", "earliest")
  25. .option("subscribe", topic)
  26. .load()
  27.  
  28. dss = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  29. .writeStream.outputMode('append')
  30. .format("console")
  31. .start()
  32.  
  33. dss.awaitTermination()
  34. except KeyboardInterrupt:
  35. print 'shutting down...'
  36.  
  37. KafkaClient {
  38. org.apache.kafka.common.security.plain.PlainLoginModule required
  39. username="user1"
  40. password="sssshhhh"
  41. serviceName="kafka";
  42. };
  43.  
  44. spark-submit
  45. --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1
  46. --files "kafka.jaas"
  47. --driver-java-options "-Djava.security.auth.login.config=kafka.jaas"
  48. --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka.jaas"
  49. "./consumer.py"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement