Advertisement
Guest User

sim

a guest
May 22nd, 2019
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.14 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. """
  3.  
  4. lecture du fichier ListeCourses_Extract.csv
  5. la carte client est indexée sous NUM_CLIENT, la DATE_ACHAT est transformée en dateTime
  6. toutes les cartes avec les index sont écrits dans le fichier /tmp/carteNum
  7. la date de dernier achat +1 (pour toutes les cartes, tous les produits) est calculée et stockeé dans le fichier /tmp/nextDate
  8. les données cartes (NUM_CLIENT), code produit (CODE_PRODUIT), date d'achat (DATE_ACHAT) sont réparties aléatoirement en groupe de MAX_N cartes dans des fichiers /tmp/groupX
  9.  
  10. A remplacer pour optimisation par le fichier ListeCourses_Eval (10 fois plus petit)
  11. ou par des fichiers adaptés :
  12. - date de dernier achat
  13. - liste des cartes avec index (par exemple tri sur hashage de la carte)
  14.  
  15. """
  16.  
  17. import pandas as pd
  18. import os
  19. import sys
  20. import logging
  21. from random import shuffle
  22. import numpy as np
  23. import configparser
  24.  
  25.  
  26. def load_file(DATA_IN, filename):
  27. """Chargez le fichier CSV"""
  28. logging.info('load_file: %s' % filename)
  29. # Si chargement massive obtiens problèmes de mémoire utiliser multiprocessing - Oxford Protein Informatics Group
  30. df = pd.read_csv(DATA_IN + filename, header=0, dtype={'nucaecmx': 'uint64', 'cdtxe': 'uint16'})
  31. df.columns = ['CARTE_CLIENT', 'CODE_PRODUIT', 'DATE_ACHAT']
  32. # C'est plus rapide de parser les dates après pd.read_csv(). format='%Y-%m-%d'
  33. df['DATE_ACHAT'] = pd.to_datetime(df['DATE_ACHAT'])
  34. # CARTE_CLIENT devient une catégorie et non une donnée numérique
  35. df['CARTE_CLIENT'] = df['CARTE_CLIENT'].astype('category')
  36. # indexation des cartes 0 = carte 1 du fichier ...
  37. df['NUM_CLIENT'] = df['CARTE_CLIENT'].cat.codes.astype('uint32')
  38. print(df.head(10))
  39. logging.info('load_file OK')
  40. return df
  41.  
  42. def pickle_key_carte_num_client(df):
  43. """Extrait DataFrame Temp de correspondance NUM_CLIENT (réduire dtype + meilleur parallélisation CPU après"""
  44. logging.info('pickle_key_carte_num_client')
  45. # distinct CARTE_CLIENT, NUM_CLIENT
  46. carteNum = df.drop(['CODE_PRODUIT', 'DATE_ACHAT'], axis=1)
  47. carteNum = carteNum.drop_duplicates()
  48. carteNum = carteNum.reset_index(drop=True)
  49. # écriture fichier compressé lien CARTE_CLIENT / NUM_CLIENT : carteNum
  50. carteNum.to_pickle(DATA_TEMP + 'carteNum', compression='infer')
  51. # suppression de la carte client dans le DataFrame ListeCourses
  52. df = df.drop(['CARTE_CLIENT'], axis=1)
  53. print(carteNum.head(10))
  54. logging.info('pickle_key_carte_num_client OK')
  55. return df
  56.  
  57.  
  58. def pickle_next_date(df, DATA_TEMP):
  59. """Créer un fichier pickle avec la prochaine date. nextDate est nécessaire pour parser les données"""
  60. logging.info('pickle_next_date')
  61. # recherche de la dernière date d'achat du fichier et calcul date + 1
  62. nextDate = pd.DataFrame(data={'nextDate': [sorted(df['DATE_ACHAT'].unique())[-1] + np.timedelta64(1, 'D')]})
  63. # ecriture dans le fichier nextDate
  64. nextDate.to_pickle(DATA_TEMP + 'nextDate', compression='infer')
  65. print(nextDate.head(10))
  66. logging.info('pickle_next_date OK')
  67.  
  68.  
  69. def grouped_client_list(df, n=10):
  70. """Produit des groupes clients mélangé avec maximum n clients
  71. permet de distribuer le multitraitement CPU de manière égale (serveur HC005...: n=20000)"""
  72. logging.info('grouped_client_list')
  73. # creation de la liste des num clients + index
  74. # creation de la liste unique des clients
  75. list_clients = list(df['NUM_CLIENT'].unique())
  76. len_list_clients = len(list_clients)
  77. clientNum_all = df.drop(['CODE_PRODUIT', 'DATE_ACHAT'], axis=1)
  78. indexlisteclient = []
  79. for c in list_clients:
  80. ind = clientNum_all.loc[clientNum_all['NUM_CLIENT']==c].index
  81. indexlisteclient.append([c,ind])
  82. shuffle(list_clients)
  83. # découpage en n fichiers max
  84. liste_decoup = []
  85. i_first = 0
  86. quotient = len_list_clients // n
  87. reste = len_list_clients % n
  88. for i in range(0,n):
  89. if (i<reste):
  90. i_nb = quotient + 1
  91. else:
  92. i_nb = quotient
  93. liste_decoup.append([i_first, i_first + i_nb])
  94. i_first = i_first + i_nb
  95. print("n = " + str(n))
  96. print("nb clients = " + str(len_list_clients))
  97. print(liste_decoup)
  98. grouped_list = [[i,list_clients[liste_decoup[i][0]:liste_decoup[i][1]]] for i in range(0, n)]
  99. print(grouped_list)
  100. # les cartes sont réparties par groupes de n de manière aléatoires
  101. logging.info('--- %s clients en groupes maximum %s fichiers' % (len_list_clients, n))
  102. logging.info('grouped_client_list OK')
  103. return grouped_list
  104.  
  105.  
  106. def pickle_client_groups(df, DATA_TEMP, glist, compression='infer'):
  107. """Utilisez glist pour créer des fichiers avec mois de client. Export = fichier pickle"""
  108. logging.info('pickle_client_groups')
  109. logging.info('recherche des codes uniques')
  110. clientNum_all = df.drop(['CODE_PRODUIT', 'DATE_ACHAT'], axis=1)
  111. listeclient = list(df['NUM_CLIENT'].unique())
  112. indexlisteclient = []
  113. for c in listeclient:
  114. ind = clientNum_all.loc[clientNum_all['NUM_CLIENT']==c].index
  115. indexlisteclient.append([c,ind])
  116. # ecriture des fichiers groupX découpage des données
  117. for i in range(0,len(listeclient)):
  118. grouped_df = df.iloc[indexlisteclient[i][1]]
  119. #print(grouped_df.head(10))
  120. #grouped_df.to_pickle(DATA_TEMP + 'group%s' % indexlisteclient[i][0], compression=compression)
  121. logging.info('pickle_client_groups OK')
  122.  
  123. if __name__ == "__main__":
  124.  
  125. # --- variables d'environnement
  126. DATA_IN = ''
  127. DATA_OUT = 'out/'
  128. DATA_TEMP = 'tmp/'
  129. LOG = 'log/'
  130.  
  131. # création du répertoire LOG dans le bucket de sortie
  132. if not(os.path.isdir("log")):
  133. os.mkdir("log")
  134. if not(os.path.isdir("tmp")):
  135. os.mkdir("tmp")
  136. if not(os.path.isdir("out")):
  137. os.mkdir("out")
  138.  
  139. # --- lecture du fichier configuration
  140. config = configparser.ConfigParser()
  141. config.read('configAO.ini', encoding='utf-8-sig')
  142. print(config)
  143. FILE_NAME = "ListeCourses_Extract_1000.csv"
  144. N_CPU = 5
  145. LOG_NAME = "compute-sfm.log"
  146.  
  147. try:
  148. # -- declaration des logs
  149. logger = logging.getLogger('LOGGER_NAME')
  150. logging.basicConfig(level=logging.getLevelName('INFO'), format="%(asctime)s %(levelname)s %(message)s", filemode="w", filename=LOG + LOG_NAME)
  151. logging.info('--- compute-sfm.py')
  152.  
  153. # charger fichier
  154. df = load_file(DATA_IN, FILE_NAME)
  155. # transformation en NUM_CLIENT et éxportation clé
  156. df = pickle_key_carte_num_client(df)
  157. # nextDate est nécessaire pour parser les données
  158. pickle_next_date(df, DATA_TEMP)
  159. # diviser les clients en groupes et exporter
  160. glist = grouped_client_list(df, n=N_CPU)
  161. pickle_client_groups(df, DATA_TEMP, glist, compression='infer')
  162.  
  163. except Exception as error:
  164. # imprime le traceback de l'erreur dans le log
  165. print(error)
  166. logger.exception(error)
  167. # exit avec status n = 999 --> transmettre l'erreur au log du fichier d'exécution principal également
  168. sys.exit(999)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement