Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- files = [
- white_wine,
- red_wine,
- rose_wine,
- sparkling_wine
- ]
- names = [
- "white_wine",
- "red_wine",
- "rose_wine",
- "sparkling_wine"
- ]
- def process_csv(dataframes, categories):
- new_dataframes = []
- for dataframe, category in zip(dataframes, categories):
- category = category.split("_")[0].title()
- new_dataframe = dataframe.groupBy('Country', 'Region', 'Winery') \
- .avg("Rating", "Price") \
- .sort("Country")
- new_dataframe = new_dataframe.withColumn("index", lit((1 + monotonically_increasing_id()).cast("string")))
- new_dataframe = new_dataframe.withColumn("id", lit(uuid.uuid1().hex))
- new_dataframes.append(new_dataframe)
- for dataframe, category in zip(new_dataframes, categories):
- dfJson = dataframe.toPandas()
- dfJson.to_json(f'{category}.json', orient="records", force_ascii=False, lines=False)
- def send_files_to_cosmos(file_path: str, database_name: str, container_name: str, cosmos_connection_string: str) -> dict:
- cosmos_client = CosmosClient.from_connection_string(cosmos_connection_string)
- with open(f"{file_path}", 'r', encoding="UTF-8") as file:
- file_as_json = json.load(file)
- file_as_dict = {}
- for dicts in file_as_json:
- file_as_dict[dicts["id"]] = dicts
- for index in range(1, len(file_as_dict)):
- cosmos_client.get_database_client(database_name).get_container_client(container_name).create_item((file_as_dict[str(index)]))
- for name in names:
- send_files_to_cosmos(
- file_path=name,
- database_name=DATABASE_NAME,
- container_name=CONTAINER_NAME,
- cosmos_connection_string=COSMOS_CONNECTION_STRING
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement