Advertisement
Guest User

Untitled

a guest
Mar 21st, 2016
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.86 KB | None | 0 0
  1. from __future__ import print_function
  2. import json
  3. import sys
  4. from datetime import datetime
  5. from operator import itemgetter
  6.  
  7. from pyspark.sql import Row
  8. from pyspark.sql import HiveContext, SQLContext
  9. from pyspark.streaming.kafka import KafkaUtils, OffsetRange
  10. from pyspark.streaming import StreamingContext
  11. from pyspark import SparkConf, SparkContext
  12. from pyspark.storagelevel import StorageLevel
  13.  
  14.  
  15. if __name__ == "__main__":
  16.   app_name = "streamingSparkEt%s" % datetime.today().strftime("%y%m%d_%H%M")
  17.   n_cores = 2
  18.   n_executors = 10
  19.   n_partitions = n_cores * n_executors
  20.  
  21.   conf = (SparkConf()
  22.     .setAppName(app_name)
  23.     .set("spark.driver.memory", "3G")
  24.     .set("spark.executor.cores" ,"%s" % n_cores)
  25.     .set("spark.executor.memory" ,"3G")
  26.     .set("spark.default.parallelism", "%s" % n_partitions)    
  27.     )
  28.  
  29.   sc = SparkContext(conf=conf) # Create new context
  30.   ssc = StreamingContext(sc, 10)
  31.  
  32.   kafkaParams = {"metadata.broker.list": ...} # 4 brokers
  33.   directKafkaStream = KafkaUtils.createDirectStream(ssc
  34.     , ["EtngTest2"]
  35.     , kafkaParams)
  36.  
  37.   ssc.checkpoint("hdfs:///tmp/%s" % app_name)
  38.  
  39.   def flatten(js):
  40.     n = js['js1']['js3']
  41.     e = js['js1']['js2']
  42.  
  43.     if n == 'bla' and js.get('r',{}).get('t') > 0:
  44.       b = 1
  45.     else:
  46.       b = 0
  47.  
  48.     es = []
  49.     for ee in e.keys():
  50.         v, f, l, g, eb, bc = e[ee]
  51.         es.append( ((int(ee), v), b) )
  52.     return es
  53.  
  54.   stream_it = (directKafkaStream
  55.     .map(lambda (key, js_string): json.loads(js_string))
  56.     .filter(lambda js: js['js1'].get('js3') != None and js['js1'].get('js2') != None)
  57.     .map(lambda js: (js['js1']['js3'], js)) #
  58.     .flatMapValues(flatten)
  59.     .map(lambda (s, ((f1, f2), f3)): ( (s, f1, f2), f3) )
  60.     .reduceByKey(lambda x,y: x+y)
  61.     .foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x) ))
  62.     )
  63.  
  64.   ssc.start()
  65.   ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement