Guest User

spark kafka streaming

a guest
Jun 13th, 2016
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.35 KB | None | 0 0
  1. Configurations in spark-default.conf:
  2.  
  3. spark.driver.memory 2g
  4. spark.driver.maxResultSize 3g
  5. spark.driver.cores 5
  6.  
  7. spark.executor.memory 8g
  8. spark.default.parallelism 160 #2 * total no of cores in cluster
  9.  
  10. spark.streaming.backpressure.enabled true
  11. spark.streaming.backpressure.pid.minRate 1800
  12. spark.streaming.receiver.maxRate 4500
  13.  
  14. spark.worker.cleanup.enabled true
  15. spark.worker.cleanup.appDataTtl 172800
  16.  
  17. spark.executor.cores 16
  18.  
  19. spark.scheduler.mode FAIR
  20. spark.locality.wait 100ms
  21. spark.speculation true
  22. spark.cleaner.ttl 3600s
  23.  
  24. spark.serializer org.apache.spark.serializer.KryoSerializer
  25. spark.kryoserializer.buffer.max 128m
  26.  
  27.  
  28. Python Code:
  29.  
  30. from __future__ import print_function
  31.  
  32. import sys
  33.  
  34. from pyspark import SparkContext, SparkConf
  35. from pyspark.streaming import StreamingContext
  36. from pyspark.streaming.kafka import KafkaUtils
  37.  
  38. import json
  39.  
  40. brokers = None
  41. topic = None
  42.  
  43. def getImageUrl(line):
  44.     line = json.loads(line)
  45.     return (line['imageUrl'], {'count':1, 'source':line['source']})
  46.  
  47. def reduceRows(a, b):
  48.     a['count'] += b['count']
  49.     return a
  50.  
  51. def invReduceRows(a, b):
  52.     a['count'] -= b['count']
  53.     return a
  54.  
  55. def sendPartition(iterator):
  56.    
  57.     for item in iterator:
  58.         if item[1]['count'] > 50:
  59.            #check source and save to another kafka queue
  60.  
  61. def createContext(brokers, topic):
  62.     conf = SparkConf()
  63.  
  64.     sc = SparkContext(appName="KafkaVirality", conf=conf)
  65.     ssc = StreamingContext(sc, 30)
  66.  
  67.     #brokers, topic = sys.argv[1:]
  68.     print("%s, %s" % (brokers, topic))
  69.  
  70.     kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "group.id":"spark1"})
  71.  
  72.     lines = kvs.map(lambda x: x[1])
  73.     counts = lines.map(getImageUrl) \
  74.         .reduceByKeyAndWindow(reduceRows, invReduceRows, 1800, 60)
  75.  
  76.     #counts.foreachRDD(checkAndSave)
  77.     counts.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
  78.     return ssc
  79.  
  80. if __name__ == "__main__":
  81.     if len(sys.argv) != 3:
  82.         print("Usage: kafkastream.py <broker_list> <topic>", file=sys.stderr)
  83.         exit(-1)
  84.  
  85.     brokers, topic = sys.argv[1:]
  86.     print("%s, %s" % (brokers, topic))
  87.  
  88.     checkpointDir = 'hdfs://hadoop-1:9000/spark_checkpoint'
  89.  
  90.     ssc = StreamingContext.getOrCreate(checkpointDir, lambda: createContext(brokers, topic))
  91.  
  92.     ssc.start()
  93.     ssc.awaitTermination()
Add Comment
Please, Sign In to add comment