Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio, json, argparse
- from os import environ as env
- from aiokafka import AIOKafkaProducer
- from util import logging
- from util.kafka.etl import Producer
- import logging
- logger = logging.getLogger(__name__)
- parser = argparse.ArgumentParser(description='Kafka Write Probe')
- parser.add_argument('--topic', required=True, help='topic to write to')
- parser.add_argument('--payload', required=True)
- parser.add_argument('--count', type=int, required=True)
- args = parser.parse_args()
- conf = dict(
- kafka_in_topic=args.topic,
- bootstrap_servers=env['KAFKA_URL']
- )
- loop = asyncio.get_event_loop()
- producer = Producer(loop, **conf)
- producer.start()
- async def produce(loop):
- await producer.prod.send(
- args.topic,
- str.encode(args.payload)
- )
- for i in range(args.count):
- if i % 1000 == 0:
- logger.info(i)
- loop.run_until_complete(produce(loop))
- producer.stop()
- loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement