Advertisement
Guest User

Untitled

a guest
Feb 23rd, 2019
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.50 KB | None | 0 0
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. import json
  5.  
  6. sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
  7. sc.setLogLevel("WARN")
  8.  
  9. kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})
  10.  
  11. parsed = kafkaStream.map(lambda v: json.loads(v[1]))
  12.  
  13. parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
  14.  
  15. ssc.start()
  16. ssc.awaitTermination(timeout=180)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement