Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from __future__ import print_function
- import json
- import sys
- from datetime import datetime
- from operator import itemgetter
- from pyspark.sql import Row
- from pyspark.sql import HiveContext, SQLContext
- from pyspark.streaming.kafka import KafkaUtils, OffsetRange
- from pyspark.streaming import StreamingContext
- from pyspark import SparkConf, SparkContext
- from pyspark.storagelevel import StorageLevel
- if __name__ == "__main__":
- app_name = "streamingSparkEt%s" % datetime.today().strftime("%y%m%d_%H%M")
- n_cores = 2
- n_executors = 10
- n_partitions = n_cores * n_executors
- conf = (SparkConf()
- .setAppName(app_name)
- .set("spark.driver.memory", "3G")
- .set("spark.executor.cores" ,"%s" % n_cores)
- .set("spark.executor.memory" ,"3G")
- .set("spark.default.parallelism", "%s" % n_partitions)
- )
- sc = SparkContext(conf=conf) # Create new context
- ssc = StreamingContext(sc, 10)
- kafkaParams = {"metadata.broker.list": ...} # 4 brokers
- directKafkaStream = KafkaUtils.createDirectStream(ssc
- , ["EtngTest2"]
- , kafkaParams)
- ssc.checkpoint("hdfs:///tmp/%s" % app_name)
- def flatten(js):
- n = js['js1']['js3']
- e = js['js1']['js2']
- if n == 'bla' and js.get('r',{}).get('t') > 0:
- b = 1
- else:
- b = 0
- es = []
- for ee in e.keys():
- v, f, l, g, eb, bc = e[ee]
- es.append( ((int(ee), v), b) )
- return es
- stream_it = (directKafkaStream
- .map(lambda (key, js_string): json.loads(js_string))
- .filter(lambda js: js['js1'].get('js3') != None and js['js1'].get('js2') != None)
- .map(lambda js: (js['js1']['js3'], js)) #
- .flatMapValues(flatten)
- .map(lambda (s, ((f1, f2), f3)): ( (s, f1, f2), f3) )
- .reduceByKey(lambda x,y: x+y)
- .foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x) ))
- )
- ssc.start()
- ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement