Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Configurations in spark-default.conf:
- spark.driver.memory 2g
- spark.driver.maxResultSize 3g
- spark.driver.cores 5
- spark.executor.memory 8g
- spark.default.parallelism 160 #2 * total no of cores in cluster
- spark.streaming.backpressure.enabled true
- spark.streaming.backpressure.pid.minRate 1800
- spark.streaming.receiver.maxRate 4500
- spark.worker.cleanup.enabled true
- spark.worker.cleanup.appDataTtl 172800
- spark.executor.cores 16
- spark.scheduler.mode FAIR
- spark.locality.wait 100ms
- spark.speculation true
- spark.cleaner.ttl 3600s
- spark.serializer org.apache.spark.serializer.KryoSerializer
- spark.kryoserializer.buffer.max 128m
- Python Code:
- from __future__ import print_function
- import sys
- from pyspark import SparkContext, SparkConf
- from pyspark.streaming import StreamingContext
- from pyspark.streaming.kafka import KafkaUtils
- import json
- brokers = None
- topic = None
- def getImageUrl(line):
- line = json.loads(line)
- return (line['imageUrl'], {'count':1, 'source':line['source']})
- def reduceRows(a, b):
- a['count'] += b['count']
- return a
- def invReduceRows(a, b):
- a['count'] -= b['count']
- return a
- def sendPartition(iterator):
- for item in iterator:
- if item[1]['count'] > 50:
- #check source and save to another kafka queue
- def createContext(brokers, topic):
- conf = SparkConf()
- sc = SparkContext(appName="KafkaVirality", conf=conf)
- ssc = StreamingContext(sc, 30)
- #brokers, topic = sys.argv[1:]
- print("%s, %s" % (brokers, topic))
- kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "group.id":"spark1"})
- lines = kvs.map(lambda x: x[1])
- counts = lines.map(getImageUrl) \
- .reduceByKeyAndWindow(reduceRows, invReduceRows, 1800, 60)
- #counts.foreachRDD(checkAndSave)
- counts.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
- return ssc
- if __name__ == "__main__":
- if len(sys.argv) != 3:
- print("Usage: kafkastream.py <broker_list> <topic>", file=sys.stderr)
- exit(-1)
- brokers, topic = sys.argv[1:]
- print("%s, %s" % (brokers, topic))
- checkpointDir = 'hdfs://hadoop-1:9000/spark_checkpoint'
- ssc = StreamingContext.getOrCreate(checkpointDir, lambda: createContext(brokers, topic))
- ssc.start()
- ssc.awaitTermination()
Add Comment
Please, Sign In to add comment