Guest User

Untitled

a guest
Jan 18th, 2018
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.12 KB | None | 0 0
  1. """
  2. kafka.py - tests the local kafka broker
  3. """
  4. import datetime
  5. import json
  6. import os
  7.  
  8. from kafka import KafkaProducer, KafkaConsumer, TopicPartition
  9.  
  10.  
  11. kafka_host = os.environ.get('KAFKA_HOST', 'localhost')
  12. kafka_port = os.environ.get('KAFKA_PORT', '9092')
  13.  
  14. TOPIC = os.environ.get('KAFKA_TOPIC', 'test')
  15. URL = '{}:{}'.format(kafka_host, kafka_port)
  16. CLIENT_ID = os.environ.get('KAFKA_CLIENT_ID', 'tester')
  17.  
  18.  
  19. class JsonSerializer(object):
  20. @staticmethod
  21. def serialize(value):
  22. return json.dumps(value).encode('utf-8')
  23.  
  24. @staticmethod
  25. def deserialize(value):
  26. return json.loads(value.decode('utf-8'))
  27.  
  28.  
  29. class Tester(object):
  30. """
  31. Component that provides a produce and consume behavior
  32. """
  33. def produce(self):
  34. """
  35. Produces a message on kafka with the current datetime
  36. """
  37. producer = KafkaProducer(
  38. bootstrap_servers=[URL],
  39. client_id=CLIENT_ID,
  40. value_serializer=JsonSerializer.serialize,
  41. api_version=(0, 10, 1),
  42. batch_size=0,
  43. acks='all'
  44. )
  45.  
  46. message = datetime.datetime.now().isoformat()
  47. future_metadata = producer.send(TOPIC, message)
  48. producer.flush()
  49. metadata = future_metadata.get(timeout=10)
  50. print('Produced this message (offset={}): {}'.format(metadata.offset, message))
  51. producer.close()
  52.  
  53. return metadata
  54.  
  55. def consume(self, metadata):
  56. """
  57. Consumes a message
  58. """
  59. consumer = KafkaConsumer(
  60. bootstrap_servers=URL,
  61. client_id=CLIENT_ID,
  62. value_deserializer=JsonSerializer.deserialize,
  63. api_version=(0, 10, 1)
  64. )
  65.  
  66. partition = TopicPartition(TOPIC, 0)
  67. consumer.assign([partition])
  68. consumer.seek(partition, metadata.offset)
  69. message = next(consumer)
  70. print('Consumed this message (offset={}): {}'.format(metadata.offset, message))
  71. consumer.close()
  72.  
  73.  
  74. def main():
  75. """
  76. main entrypoint
  77. """
  78. tester = Tester()
  79. message_info = tester.produce()
  80. tester.consume(message_info)
  81.  
  82.  
  83. if __name__ == "__main__":
  84. main()
Add Comment
Please, Sign In to add comment