Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import numpy as np
- import requests
- import pandahouse as ph
- # путь к большой таблице на 6 млн строк
- fPath = "C:\\GA_R\\csv\\"
- # грузим таблицу в пандафрейм
- GAaud_hist = pd.read_csv(fPath+'GAaud_hist'+'.csv',sep=';', decimal=',')
- # добавляем столбец с индексом
- GAaud_hist['index'] = np.arange(1, len(GAaud_hist) + 1)
- print(f'Сформированы датафреймы для заливы в КХ \n {len(GAaud_hist)} ')
- # настройки коннекта к КХ на Hetzner
- db = 'GA'
- connection_Hetzner = {'host': 'http://xx.xx.xxx.154:8123',
- 'database': db,
- 'user': 'default',
- 'password': 'hDmTNbXqtC0LY'
- }
- connection = connection_Hetzner
- # формуруем запрос на создание таблицы в КликХаузе
- # вытягиваем все названия и типы столбцов из панды
- GAaud = pd.DataFrame(GAaud_hist.dtypes)
- # переводим индекс в столбец под названием Index
- GAaud.reset_index(level=0, inplace=True)
- # в столбце Index каждому значениею добавляем двойную кавычку, так как названия столбцов в КликХаузе должны быть в двойных кавычках
- GAaud['index'] = GAaud['index'].apply(lambda x: f'"{x}"')
- # словарь соответствия типов данных панды и типов данных КликХауза
- types = {'object': 'String', 'int64': 'UInt64', 'float64': 'Float64', 'int32': 'UInt32'}
- # переводим названия типов из нотации Питона в нотацию Кликхауза
- GAaud = GAaud.replace({0: types})
- # составляем строку запроса для КХ из таблицы cnt где теперь содержатся названия столбцов в двойных кавычках и типы данных в этих столбцах для КХ
- GAaud['clickhouse'] = GAaud['index'].str.cat(GAaud[0], sep=" ") # сначала получаем столбец состоящий из двух столбцов чтобы получить название столбца и название типа данных Например: "created" Strina
- ch_columns = GAaud['clickhouse'].tolist() # переводим полученный столбец в список, чтобы потом перевести в строку
- ch_columns = "\n,".join(str(x) for x in ch_columns) # переводим полученный список в строку где элементы списка разделены переносом строки и запятой
- # Формируем окончательную строку запроса в КХ
- comand = 'CREATE TABLE IF NOT EXISTS' # команда для Кликхауза.
- table_name = 'GAaud_hist' # название таблицы в Кликхаузе
- ch_query = f'{comand} "{db}"."{table_name}"\n({ch_columns}) \n ENGINE = MergeTree \n ORDER BY ("index")'
- # -- удаляем старую таблицу из Кликхауза с данными из прошлой заливки Датасета
- ph.execute(query=f'DROP TABLE IF EXISTS "{db}"."{table_name}"', connection=connection)
- # выполняем запрос qq
- ph.execute(query=ch_query, connection=connection)
- # записываем данные из pandas в clickhouse
- ph.to_clickhouse(GAaud_hist, table_name, index=False, connection=connection, chunksize=100000)
- print(f'Конец заливки {db}.{table_name}')
Advertisement
Add Comment
Please, Sign In to add comment