Guest User

Untitled

a guest
Nov 14th, 2018
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.26 KB | None | 0 0
  1. #!/usr/bin/env python
  2. from __future__ import print_function
  3.  
  4. import json
  5. import os
  6. import random
  7. import time
  8. import sys
  9. from kafka import KafkaProducer
  10.  
  11. KAFKA_TOPIC = "brewery_source"
  12. KAFKA_BROKERS = "enter_your_connect_string_here:port"
  13.  
  14. # Setup producer connection
  15. producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode("utf-8"),
  16. bootstrap_servers=KAFKA_BROKERS)
  17.  
  18. print("connected to {} topic {}".format(KAFKA_BROKERS, KAFKA_TOPIC))
  19.  
  20.  
  21. def get_sensor():
  22. """Return a random temperature between 30 and 90."""
  23. return random.randrange(30, 90)
  24.  
  25.  
  26. def sendto_eventador(payload):
  27. """Add a message to the produce buffer asynchronously to be sent to Eventador."""
  28. try:
  29. producer.send(KAFKA_TOPIC, payload)
  30. except:
  31. print("unable to produce to {} topic {}".format(KAFKA_BROKERS, KAFKA_TOPIC))
  32.  
  33.  
  34. payload = {}
  35. while True:
  36. try:
  37. # produce dummy data until ctrl + c
  38. sensors = ["MashTun1", "MashTun2"]
  39.  
  40. for sensor in sensors:
  41. payload = {"sensor": sensor, "temp": get_sensor()}
  42. sendto_eventador(payload)
  43. print(payload)
  44.  
  45. # Flush the produce buffer and send to kafka
  46. producer.flush()
  47. time.sleep(3)
  48.  
  49. except KeyboardInterrupt:
  50. sys.exit()
Add Comment
Please, Sign In to add comment