Guest User

Untitled

a guest
Mar 21st, 2016
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.15 KB | None | 0 0
  1. from __future__ import print_function
  2. import json
  3. import sys
  4. from datetime import datetime
  5. from operator import itemgetter
  6.  
  7. from pyspark.sql import Row
  8. from pyspark.sql import HiveContext, SQLContext
  9. from pyspark.streaming.kafka import KafkaUtils, OffsetRange
  10. from pyspark.streaming import StreamingContext
  11. from pyspark import SparkConf, SparkContext
  12. from pyspark.storagelevel import StorageLevel
  13.  
  14.  
  15. if __name__ == "__main__":
  16.   app_name = "streamingSpark%s" % datetime.today().strftime("%y%m%d_%H%M")
  17.   n_cores = 2
  18.   n_executors = 10
  19.   n_partitions = n_cores * n_executors
  20.  
  21.   conf = (SparkConf()
  22.     .setAppName(app_name)
  23.     .set("spark.executor.cores" ,"%s" % n_cores)
  24.     )
  25.  
  26.   sc = SparkContext(conf=conf) # Create new context
  27.   ssc = StreamingContext(sc, 10)
  28.  
  29.   kafkaParams = {"metadata.broker.list": ...} # 4 brokers
  30.   directKafkaStream = KafkaUtils.createDirectStream(ssc
  31.     , ["Test"]
  32.     , kafkaParams)
  33.  
  34.   stream_it = (directKafkaStream
  35.     .map(lambda (key, js_string): json.loads(js_string))
  36.     .map(lambda js: js.keys())
  37.     .foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x) ))
  38.     )
  39.  
  40.   ssc.start()
  41.   ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment