Advertisement
Guest User

Untitled

a guest
Jun 20th, 2019
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.77 KB | None | 0 0
  1. parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
  2.  
  3. #1
  4. import os
  5. os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
  6.  
  7. #2
  8. from pyspark import SparkContext
  9. from pyspark.streaming import StreamingContext
  10. from pyspark.streaming.kafka import KafkaUtils
  11. import json
  12.  
  13. #3
  14. sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
  15. sc.setLogLevel("WARN")
  16.  
  17. #4
  18. ssc = StreamingContext(sc, 60)
  19.  
  20. #5
  21. kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})
  22.  
  23. #6
  24. parsed = kafkaStream.map(lambda v: json.loads(v[1]))
  25.  
  26. #7
  27. parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
  28.  
  29. import sys
  30. from importlib import reload
  31. reload(sys)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement