Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import logging
- import asyncio
- from datetime import datetime
- from aioinflux import AsyncInfluxDBClient
- logging.basicConfig(format=u'%(levelname)-8s [%(asctime)s] %(message)s',
- level=logging.INFO)
- DB_NAME = 'testname'
- TEST_TEXT = '{"user": 12345123, "message": "тестовый текст сообщения"}'
- def get_json_or_false(text):
- try:
- jsn = json.loads(text)
- except ValueError as e:
- logging.error(f'Error parsing json!:\n{e}')
- return False
- else:
- return jsn
- async def process_new_data(text_line):
- my_json = get_json_or_false(text_line)
- if not my_json:
- return
- point = dict(time=datetime.now(),
- measurement='some_measurement_lol',
- fields=my_json)
- client = AsyncInfluxDBClient(db=DB_NAME)
- return await client.write(point)
- loop = asyncio.get_event_loop()
- loop.run_until_complete(process_new_data(TEST_TEXT))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement