Advertisement
Guest User

Untitled

a guest
Feb 21st, 2020
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.99 KB | None | 0 0
  1. from pyspark import SparkContext
  2. from pyspark.sql import SparkSession
  3. import configparser
  4.  
  5. iniFileLocation = '/home/consumer/config.ini'
  6. config = configparser.ConfigParser()
  7. config.read(iniFileLocation)
  8.  
  9. #KAFKA configuration
  10. bootstrapserver = config['kafka']['kafka.bootstrap.servers']
  11. topics = config['kafka']['subscribe']
  12. autocommit = config['kafka']['enable.auto.commit']
  13. autocommit_interval = config['kafka']['auto.commit.interval.ms']
  14.  
  15. #HDFS configuration
  16. hdfs_path = config['hdfs']['path']
  17. hdfs_port = config['hdfs']['port']
  18.  
  19. sc = SparkContext(appName="kafkaConsumer")
  20. sc.setLogLevel("WARN")
  21. spark = SparkSession(sc)
  22.  
  23. df = spark.read \
  24. .format("kafka") \
  25. .option("subscribe", topics) \
  26. .option("kafka.bootstrap.servers", bootstrapserver) \
  27. .option("enable.auto.commit", autocommit) \
  28. .option("auto.commit.interval.ms", autocommit_interval) \
  29. .load()
  30.  
  31. #df.write.format("csv").save(hdfs_path)
  32. df.printSchema
  33. print("test")
  34. df.show
  35. print(df)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement