Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from typing import Optional
- import structlog
- from influxdb_client.client.write.point import Point
- from influxdb_client.domain.write_precision import WritePrecision
- from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
- logger = structlog.get_logger()
- bucket = "crypto"
- org = "morington"
- token = "7rbrLc5gBW1-f_pqzZZLV2ZhnnL80im4UzWNtBdYeEft2pgYyo1xO6jtFk7Z33ACilyZNNmoyk6xGdVxoIusUw=="
- url = "http://localhost:8086"
- def generate_data_point(tags: dict, fields: dict, timestamp: Optional[int] = None) -> dict:
- """
- Генерирует словарь для записи данных в InfluxDB.
- Params:
- - tags: dict - Словарь тегов, где ключ - название тега, значение - значение тега.
- - fields: dict - Словарь полей, где ключ - название поля, значение - значение поля.
- - timestamp: int - Временная метка для точки данных. Опционально.
- Returns:
- dict - Словарь с данными для записи.
- """
- data_point = {}
- for tag, value in tags.items():
- data_point[f"tag_{tag}"] = value
- data_point.update(fields)
- if timestamp:
- data_point["time"] = timestamp
- return data_point
- async def write_to_influxdb(measurement: str, data_points: list[dict]) -> None:
- """
- Асинхронная универсальная функция для записи данных в InfluxDB.
- Params:
- - measurement: Название (measurement) для записи данных.
- - data_points: Список словарей с данными для записи.
- """
- logger.debug('Write data to InfluxDB', data=data_points)
- async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
- points = []
- for item in data_points:
- point = Point(measurement)
- # Если не указано временная метка для записи
- if 'time' not in item:
- point = point.time(WritePrecision.MS)
- for key, value in item.items():
- if key == "time":
- point = point.time(value, WritePrecision.MS)
- elif key.startswith("tag_"):
- point = point.tag(key[4:], value)
- else:
- point = point.field(key, value)
- points.append(point)
- await client.write_api().write(bucket=bucket, record=point)
- logger.debug('Write data to InfluxDB', count=len(points))
- async def fetch_data(measurement: str, start: str, stop: str = 'now()'):
- """
- Асинхронная функция для получения данных из InfluxDB.
- :param bucket: Имя бакета в InfluxDB.
- :param measurement: Имя измерения в InfluxDB.
- :param start: Начальная точка временного диапазона (например, '-1h', '-1d').
- :param stop: Конечная точка временного диапазона (по умолчанию 'now()').
- """
- async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
- query_api = client.query_api()
- query = f'''
- from(bucket: "{bucket}")
- |> range(start: {start}, stop: {stop})
- |> filter(fn: (r) => r._measurement == "{measurement}")
- '''
- result = await query_api.query(query)
- logger.debug('Fetch data from InfluxDB', result=result)
- for table in result:
- for record in table.records:
- logger.debug(f'Time: {record.get_time()}, Value: {record.get_value()}')
- async def main():
- await fetch_data('fundingRate', '-1d')
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement