Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/lib/python
- # -*- coding: utf-8 -*-
- from pyspark import SparkContext, SparkConf
- from pyspark.streaming import StreamingContext
- from kafka import KafkaProducer
- import sys
- import time
- reload(sys)
- sys.setdefaultencoding('utf8')
- producer = kafka.KafkaProducer(bootstrap_servers="Broker_list")
- def send_to_kafka(rdd):
- tweets = rdd.collect()
- for tweet in tweets:
- print tweet
- producer.send('test_historical_job', value=bytes(tweet))
- if __name__ == "__main__":
- conf = SparkConf().setAppName("TestSparkFromPython")
- sc = SparkContext(conf=conf)
- ssc = StreamingContext(sc, 1)
- tweetsDstream = ssc.textFileStream("hdfs://10.62.54.254:8020/tmp/historical_jobs/")
- tweetsDstream.foreachRDD(lambda rdd: send_to_kafka(rdd))
- ssc.start()
- ssc.awaitTermination()
Add Comment
Please, Sign In to add comment