Guest User

spark_streaming_job

a guest
May 1st, 2016
1,778
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 0.84 KB | None | 0 0
  1. #!/usr/lib/python
  2. # -*- coding: utf-8 -*-
  3.  
  4. from pyspark import SparkContext, SparkConf
  5. from pyspark.streaming import StreamingContext
  6. from kafka import KafkaProducer
  7. import sys
  8. import time
  9. reload(sys)
  10. sys.setdefaultencoding('utf8')
  11.  
  12. producer = kafka.KafkaProducer(bootstrap_servers="Broker_list")
  13.  
  14. def send_to_kafka(rdd):
  15.     tweets = rdd.collect()
  16.     for tweet in tweets:
  17.         print tweet
  18.         producer.send('test_historical_job', value=bytes(tweet))
  19.  
  20. if __name__ == "__main__":
  21.  
  22.     conf = SparkConf().setAppName("TestSparkFromPython")
  23.  
  24.     sc = SparkContext(conf=conf)
  25.    
  26.     ssc = StreamingContext(sc, 1)
  27.    
  28.     tweetsDstream = ssc.textFileStream("hdfs://10.62.54.254:8020/tmp/historical_jobs/")
  29.    
  30.     tweetsDstream.foreachRDD(lambda rdd: send_to_kafka(rdd))
  31.    
  32.     ssc.start()
  33.  
  34.     ssc.awaitTermination()
Add Comment
Please, Sign In to add comment