Advertisement
Guest User

Untitled

a guest
Jun 29th, 2016
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.90 KB | None | 0 0
  1. import asyncio, json, argparse
  2. from os import environ as env
  3. from aiokafka import AIOKafkaProducer
  4.  
  5. from util import logging
  6. from util.kafka.etl import Producer
  7.  
  8. import logging
  9.  
  10. logger = logging.getLogger(__name__)
  11.  
  12. parser = argparse.ArgumentParser(description='Kafka Write Probe')
  13. parser.add_argument('--topic', required=True, help='topic to write to')
  14. parser.add_argument('--payload', required=True)
  15. parser.add_argument('--count', type=int, required=True)
  16. args = parser.parse_args()
  17.  
  18. conf = dict(
  19. kafka_in_topic=args.topic,
  20. bootstrap_servers=env['KAFKA_URL']
  21. )
  22.  
  23. loop = asyncio.get_event_loop()
  24.  
  25. producer = Producer(loop, **conf)
  26. producer.start()
  27.  
  28. async def produce(loop):
  29. await producer.prod.send(
  30. args.topic,
  31. str.encode(args.payload)
  32. )
  33.  
  34. for i in range(args.count):
  35. if i % 1000 == 0:
  36. logger.info(i)
  37.  
  38. loop.run_until_complete(produce(loop))
  39.  
  40. producer.stop()
  41. loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement