Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark import SparkContext
- from pyspark.sql import SparkSession
- import configparser
- iniFileLocation = '/home/consumer/config.ini'
- config = configparser.ConfigParser()
- config.read(iniFileLocation)
- #KAFKA configuration
- bootstrapserver = config['kafka']['kafka.bootstrap.servers']
- topics = config['kafka']['subscribe']
- autocommit = config['kafka']['enable.auto.commit']
- autocommit_interval = config['kafka']['auto.commit.interval.ms']
- #HDFS configuration
- hdfs_path = config['hdfs']['path']
- hdfs_port = config['hdfs']['port']
- sc = SparkContext(appName="kafkaConsumer")
- sc.setLogLevel("WARN")
- spark = SparkSession(sc)
- df = spark.read \
- .format("kafka") \
- .option("subscribe", topics) \
- .option("kafka.bootstrap.servers", bootstrapserver) \
- .option("enable.auto.commit", autocommit) \
- .option("auto.commit.interval.ms", autocommit_interval) \
- .load()
- #df.write.format("csv").save(hdfs_path)
- df.printSchema
- print("test")
- df.show
- print(df)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement