Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
- #1
- import os
- os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
- #2
- from pyspark import SparkContext
- from pyspark.streaming import StreamingContext
- from pyspark.streaming.kafka import KafkaUtils
- import json
- #3
- sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
- sc.setLogLevel("WARN")
- #4
- ssc = StreamingContext(sc, 60)
- #5
- kafkaStream = KafkaUtils.createStream(ssc, 'cdh57-01-node-01.moffatt.me:2181', 'spark-streaming', {'twitter':1})
- #6
- parsed = kafkaStream.map(lambda v: json.loads(v[1]))
- #7
- parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
- import sys
- from importlib import reload
- reload(sys)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement