Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- kafka.py - tests the local kafka broker
- """
- import datetime
- import json
- import os
- from kafka import KafkaProducer, KafkaConsumer, TopicPartition
- kafka_host = os.environ.get('KAFKA_HOST', 'localhost')
- kafka_port = os.environ.get('KAFKA_PORT', '9092')
- TOPIC = os.environ.get('KAFKA_TOPIC', 'test')
- URL = '{}:{}'.format(kafka_host, kafka_port)
- CLIENT_ID = os.environ.get('KAFKA_CLIENT_ID', 'tester')
- class JsonSerializer(object):
- @staticmethod
- def serialize(value):
- return json.dumps(value).encode('utf-8')
- @staticmethod
- def deserialize(value):
- return json.loads(value.decode('utf-8'))
- class Tester(object):
- """
- Component that provides a produce and consume behavior
- """
- def produce(self):
- """
- Produces a message on kafka with the current datetime
- """
- producer = KafkaProducer(
- bootstrap_servers=[URL],
- client_id=CLIENT_ID,
- value_serializer=JsonSerializer.serialize,
- api_version=(0, 10, 1),
- batch_size=0,
- acks='all'
- )
- message = datetime.datetime.now().isoformat()
- future_metadata = producer.send(TOPIC, message)
- producer.flush()
- metadata = future_metadata.get(timeout=10)
- print('Produced this message (offset={}): {}'.format(metadata.offset, message))
- producer.close()
- return metadata
- def consume(self, metadata):
- """
- Consumes a message
- """
- consumer = KafkaConsumer(
- bootstrap_servers=URL,
- client_id=CLIENT_ID,
- value_deserializer=JsonSerializer.deserialize,
- api_version=(0, 10, 1)
- )
- partition = TopicPartition(TOPIC, 0)
- consumer.assign([partition])
- consumer.seek(partition, metadata.offset)
- message = next(consumer)
- print('Consumed this message (offset={}): {}'.format(metadata.offset, message))
- consumer.close()
- def main():
- """
- main entrypoint
- """
- tester = Tester()
- message_info = tester.produce()
- tester.consume(message_info)
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment