Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import print_function
- import json
- import sys
- from datetime import datetime
- from operator import itemgetter
- from pyspark.sql import Row
- from pyspark.sql import HiveContext, SQLContext
- from pyspark.streaming.kafka import KafkaUtils, OffsetRange
- from pyspark.streaming import StreamingContext
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
- if __name__ == "__main__":
- app_name = "streamingSpark%s" % datetime.today().strftime("%y%m%d_%H%M")
- n_cores = 2
- n_executors = 10
- n_partitions = n_cores * n_executors
- conf = (SparkConf()
- .setAppName(app_name)
- .set("spark.executor.cores" ,"%s" % n_cores)
- )
- sc = SparkContext(conf=conf) # Create new context
- ssc = StreamingContext(sc, 10)
- kafkaParams = {"metadata.broker.list": ...} # 4 brokers
- directKafkaStream = KafkaUtils.createDirectStream(ssc
- , ["Test"]
- , kafkaParams)
- stream_it = (directKafkaStream
- .map(lambda (key, js_string): json.loads(js_string))
- .map(lambda js: js.keys())
- .foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x) ))
- )
- ssc.start()
- ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment