Advertisement
morington

Untitled

Feb 24th, 2024
718
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.90 KB | None | 0 0
  1. import asyncio
  2. from typing import Optional
  3.  
  4. import structlog
  5. from influxdb_client.client.write.point import Point
  6. from influxdb_client.domain.write_precision import WritePrecision
  7. from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
  8.  
  9.  
  10. logger = structlog.get_logger()
  11.  
  12.  
  13. bucket = "crypto"
  14. org = "morington"
  15. token = "7rbrLc5gBW1-f_pqzZZLV2ZhnnL80im4UzWNtBdYeEft2pgYyo1xO6jtFk7Z33ACilyZNNmoyk6xGdVxoIusUw=="
  16. url = "http://localhost:8086"
  17.  
  18.  
  19. def generate_data_point(tags: dict, fields: dict, timestamp: Optional[int] = None) -> dict:
  20.     """
  21.    Генерирует словарь для записи данных в InfluxDB.
  22.  
  23.    Params:
  24.        - tags: dict - Словарь тегов, где ключ - название тега, значение - значение тега.
  25.        - fields: dict - Словарь полей, где ключ - название поля, значение - значение поля.
  26.        - timestamp: int - Временная метка для точки данных. Опционально.
  27.  
  28.    Returns:
  29.        dict - Словарь с данными для записи.
  30.    """
  31.     data_point = {}
  32.     for tag, value in tags.items():
  33.         data_point[f"tag_{tag}"] = value
  34.     data_point.update(fields)
  35.     if timestamp:
  36.         data_point["time"] = timestamp
  37.  
  38.     return data_point
  39.  
  40.  
  41. async def write_to_influxdb(measurement: str, data_points: list[dict]) -> None:
  42.     """
  43.    Асинхронная универсальная функция для записи данных в InfluxDB.
  44.  
  45.    Params:
  46.        - measurement: Название (measurement) для записи данных.
  47.        - data_points: Список словарей с данными для записи.
  48.    """
  49.     logger.debug('Write data to InfluxDB', data=data_points)
  50.     async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
  51.         points = []
  52.         for item in data_points:
  53.             point = Point(measurement)
  54.  
  55.             # Если не указано временная метка для записи
  56.             if 'time' not in item:
  57.                 point = point.time(WritePrecision.MS)
  58.  
  59.             for key, value in item.items():
  60.                 if key == "time":
  61.                     point = point.time(value, WritePrecision.MS)
  62.                 elif key.startswith("tag_"):
  63.                     point = point.tag(key[4:], value)
  64.                 else:
  65.                     point = point.field(key, value)
  66.             points.append(point)
  67.  
  68.             await client.write_api().write(bucket=bucket, record=point)
  69.         logger.debug('Write data to InfluxDB', count=len(points))
  70.  
  71.  
  72. async def fetch_data(measurement: str, start: str, stop: str = 'now()'):
  73.     """
  74.    Асинхронная функция для получения данных из InfluxDB.
  75.  
  76.    :param bucket: Имя бакета в InfluxDB.
  77.    :param measurement: Имя измерения в InfluxDB.
  78.    :param start: Начальная точка временного диапазона (например, '-1h', '-1d').
  79.    :param stop: Конечная точка временного диапазона (по умолчанию 'now()').
  80.    """
  81.     async with InfluxDBClientAsync(url=url, token=token, org=org) as client:
  82.         query_api = client.query_api()
  83.  
  84.         query = f'''
  85.        from(bucket: "{bucket}")
  86.        |> range(start: {start}, stop: {stop})
  87.        |> filter(fn: (r) => r._measurement == "{measurement}")
  88.        '''
  89.         result = await query_api.query(query)
  90.         logger.debug('Fetch data from InfluxDB', result=result)
  91.  
  92.         for table in result:
  93.             for record in table.records:
  94.                 logger.debug(f'Time: {record.get_time()}, Value: {record.get_value()}')
  95.  
  96.  
  97. async def main():
  98.     await fetch_data('fundingRate', '-1d')
  99.  
  100.  
  101. if __name__ == "__main__":
  102.     asyncio.run(main())
  103.  
  104.  
  105.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement