Advertisement
Guest User

Untitled

a guest
Aug 20th, 2019
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.99 KB | None | 0 0
  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. from pykafka import KafkaClient
  5. import json
  6. import sys
  7. import pprint
  8.  
  9. def pushOrderStatusInKafka(status_counts):
  10. client = KafkaClient(hosts="localhost:9092")
  11. topic = client.topics['order-one-min-data']
  12. for status_count in status_counts:
  13. with topic.get_producer() as producer:
  14. producer.produce(json.dumps(status_count))
  15.  
  16. zkQuorum, topic = sys.argv[1:]
  17. sc = SparkContext(appName="KafkaOrderCount")
  18. ssc = StreamingContext(sc, 15)
  19. kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
  20. lines = kvs.map(lambda x: x[1])
  21. status_count = lines.map(lambda line: line.split(",")[2])
  22. .map(lambda order_status: (order_status, 1))
  23. .reduceByKey(lambda a, b: a+b)
  24. status_count.pprint()
  25. status_count.foreachRDD(lambda rdd: rdd.foreachPartition(pushOrderStatusInKafka))
  26. ssc.start()
  27. ssc.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement