Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from kafka import KafkaClient, SimpleProducer, KafkaConsumer
- import time
- host_port = "localhost:9092"
- topic = 'locust2'
- client = KafkaClient(host_port)
- # Instantiate Kafka client and producer objects
- producer = SimpleProducer(client,
- async=False,
- req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000)
- producer.send_messages(topic, 'Topic {0} here'.format(int(time.time())))
- # Instantiate Kafka client and consumer objects
- #self.kafka_client = KafkaLocustClient("%s:%s" % (host, port))
- consumer = KafkaConsumer('locust2',
- metadata_broker_list=[host_port],
- fetch_message_max_bytes=10485760,
- group_id='locust_test',
- consumer_timeout_ms=2000)
- print "Fetching..."
- m = consumer.next()
- print("%s:%d:%d: key=%s value=%s" % (m.topic, m.partition,
- m.offset, m.key, m.value))
- consumer.task_done(m)
- consumer.commit()
- print("Done!")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement