Advertisement
Guest User

Untitled

a guest
May 29th, 2015
243
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.03 KB | None | 0 0
  1. from kafka import KafkaClient, SimpleProducer, KafkaConsumer
  2. import time
  3.  
  4. host_port = "localhost:9092"
  5. topic = 'locust2'
  6.  
  7. client = KafkaClient(host_port)
  8.  
  9. # Instantiate Kafka client and producer objects
  10. producer = SimpleProducer(client,
  11. async=False,
  12. req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
  13. ack_timeout=2000)
  14. producer.send_messages(topic, 'Topic {0} here'.format(int(time.time())))
  15.  
  16. # Instantiate Kafka client and consumer objects
  17. #self.kafka_client = KafkaLocustClient("%s:%s" % (host, port))
  18. consumer = KafkaConsumer('locust2',
  19. metadata_broker_list=[host_port],
  20. fetch_message_max_bytes=10485760,
  21. group_id='locust_test',
  22. consumer_timeout_ms=2000)
  23.  
  24. print "Fetching..."
  25. m = consumer.next()
  26. print("%s:%d:%d: key=%s value=%s" % (m.topic, m.partition,
  27. m.offset, m.key, m.value))
  28.  
  29. consumer.task_done(m)
  30. consumer.commit()
  31.  
  32. print("Done!")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement