Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """
- lecture du fichier ListeCourses_Extract.csv
- la carte client est indexée sous NUM_CLIENT, la DATE_ACHAT est transformée en dateTime
- toutes les cartes avec les index sont écrits dans le fichier /tmp/carteNum
- la date de dernier achat +1 (pour toutes les cartes, tous les produits) est calculée et stockeé dans le fichier /tmp/nextDate
- les données cartes (NUM_CLIENT), code produit (CODE_PRODUIT), date d'achat (DATE_ACHAT) sont réparties aléatoirement en groupe de MAX_N cartes dans des fichiers /tmp/groupX
- A remplacer pour optimisation par le fichier ListeCourses_Eval (10 fois plus petit)
- ou par des fichiers adaptés :
- - date de dernier achat
- - liste des cartes avec index (par exemple tri sur hashage de la carte)
- """
- import pandas as pd
- import os
- import sys
- import logging
- from random import shuffle
- import numpy as np
- import configparser
- def load_file(DATA_IN, filename):
- """Chargez le fichier CSV"""
- logging.info('load_file: %s' % filename)
- # Si chargement massive obtiens problèmes de mémoire utiliser multiprocessing - Oxford Protein Informatics Group
- df = pd.read_csv(DATA_IN + filename, header=0, dtype={'nucaecmx': 'uint64', 'cdtxe': 'uint16'})
- df.columns = ['CARTE_CLIENT', 'CODE_PRODUIT', 'DATE_ACHAT']
- # C'est plus rapide de parser les dates après pd.read_csv(). format='%Y-%m-%d'
- df['DATE_ACHAT'] = pd.to_datetime(df['DATE_ACHAT'])
- # CARTE_CLIENT devient une catégorie et non une donnée numérique
- df['CARTE_CLIENT'] = df['CARTE_CLIENT'].astype('category')
- # indexation des cartes 0 = carte 1 du fichier ...
- df['NUM_CLIENT'] = df['CARTE_CLIENT'].cat.codes.astype('uint32')
- print(df.head(10))
- logging.info('load_file OK')
- return df
- def pickle_key_carte_num_client(df):
- """Extrait DataFrame Temp de correspondance NUM_CLIENT (réduire dtype + meilleur parallélisation CPU après"""
- logging.info('pickle_key_carte_num_client')
- # distinct CARTE_CLIENT, NUM_CLIENT
- carteNum = df.drop(['CODE_PRODUIT', 'DATE_ACHAT'], axis=1)
- carteNum = carteNum.drop_duplicates()
- carteNum = carteNum.reset_index(drop=True)
- # écriture fichier compressé lien CARTE_CLIENT / NUM_CLIENT : carteNum
- carteNum.to_pickle(DATA_TEMP + 'carteNum', compression='infer')
- # suppression de la carte client dans le DataFrame ListeCourses
- df = df.drop(['CARTE_CLIENT'], axis=1)
- print(carteNum.head(10))
- logging.info('pickle_key_carte_num_client OK')
- return df
- def pickle_next_date(df, DATA_TEMP):
- """Créer un fichier pickle avec la prochaine date. nextDate est nécessaire pour parser les données"""
- logging.info('pickle_next_date')
- # recherche de la dernière date d'achat du fichier et calcul date + 1
- nextDate = pd.DataFrame(data={'nextDate': [sorted(df['DATE_ACHAT'].unique())[-1] + np.timedelta64(1, 'D')]})
- # ecriture dans le fichier nextDate
- nextDate.to_pickle(DATA_TEMP + 'nextDate', compression='infer')
- print(nextDate.head(10))
- logging.info('pickle_next_date OK')
- def grouped_client_list(df, n=10):
- """Produit des groupes clients mélangé avec maximum n clients
- permet de distribuer le multitraitement CPU de manière égale (serveur HC005...: n=20000)"""
- logging.info('grouped_client_list')
- # creation de la liste des num clients + index
- # creation de la liste unique des clients
- list_clients = list(df['NUM_CLIENT'].unique())
- len_list_clients = len(list_clients)
- clientNum_all = df.drop(['CODE_PRODUIT', 'DATE_ACHAT'], axis=1)
- indexlisteclient = []
- for c in list_clients:
- ind = clientNum_all.loc[clientNum_all['NUM_CLIENT']==c].index
- indexlisteclient.append([c,ind])
- shuffle(list_clients)
- # découpage en n fichiers max
- liste_decoup = []
- i_first = 0
- quotient = len_list_clients // n
- reste = len_list_clients % n
- for i in range(0,n):
- if (i<reste):
- i_nb = quotient + 1
- else:
- i_nb = quotient
- liste_decoup.append([i_first, i_first + i_nb])
- i_first = i_first + i_nb
- print("n = " + str(n))
- print("nb clients = " + str(len_list_clients))
- print(liste_decoup)
- grouped_list = [[i,list_clients[liste_decoup[i][0]:liste_decoup[i][1]]] for i in range(0, n)]
- print(grouped_list)
- # les cartes sont réparties par groupes de n de manière aléatoires
- logging.info('--- %s clients en groupes maximum %s fichiers' % (len_list_clients, n))
- logging.info('grouped_client_list OK')
- return grouped_list
- def pickle_client_groups(df, DATA_TEMP, glist, compression='infer'):
- """Utilisez glist pour créer des fichiers avec mois de client. Export = fichier pickle"""
- logging.info('pickle_client_groups')
- logging.info('recherche des codes uniques')
- clientNum_all = df.drop(['CODE_PRODUIT', 'DATE_ACHAT'], axis=1)
- listeclient = list(df['NUM_CLIENT'].unique())
- indexlisteclient = []
- for c in listeclient:
- ind = clientNum_all.loc[clientNum_all['NUM_CLIENT']==c].index
- indexlisteclient.append([c,ind])
- # ecriture des fichiers groupX découpage des données
- for i in range(0,len(listeclient)):
- grouped_df = df.iloc[indexlisteclient[i][1]]
- #print(grouped_df.head(10))
- #grouped_df.to_pickle(DATA_TEMP + 'group%s' % indexlisteclient[i][0], compression=compression)
- logging.info('pickle_client_groups OK')
- if __name__ == "__main__":
- # --- variables d'environnement
- DATA_IN = ''
- DATA_OUT = 'out/'
- DATA_TEMP = 'tmp/'
- LOG = 'log/'
- # création du répertoire LOG dans le bucket de sortie
- if not(os.path.isdir("log")):
- os.mkdir("log")
- if not(os.path.isdir("tmp")):
- os.mkdir("tmp")
- if not(os.path.isdir("out")):
- os.mkdir("out")
- # --- lecture du fichier configuration
- config = configparser.ConfigParser()
- config.read('configAO.ini', encoding='utf-8-sig')
- print(config)
- FILE_NAME = "ListeCourses_Extract_1000.csv"
- N_CPU = 5
- LOG_NAME = "compute-sfm.log"
- try:
- # -- declaration des logs
- logger = logging.getLogger('LOGGER_NAME')
- logging.basicConfig(level=logging.getLevelName('INFO'), format="%(asctime)s %(levelname)s %(message)s", filemode="w", filename=LOG + LOG_NAME)
- logging.info('--- compute-sfm.py')
- # charger fichier
- df = load_file(DATA_IN, FILE_NAME)
- # transformation en NUM_CLIENT et éxportation clé
- df = pickle_key_carte_num_client(df)
- # nextDate est nécessaire pour parser les données
- pickle_next_date(df, DATA_TEMP)
- # diviser les clients en groupes et exporter
- glist = grouped_client_list(df, n=N_CPU)
- pickle_client_groups(df, DATA_TEMP, glist, compression='infer')
- except Exception as error:
- # imprime le traceback de l'erreur dans le log
- print(error)
- logger.exception(error)
- # exit avec status n = 999 --> transmettre l'erreur au log du fichier d'exécution principal également
- sys.exit(999)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement