Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # from tqdm import tqdm
- # import fix_price_email
- # from abc import ABCMeta, abstractmethod
- import datetime
- import logging
- import os
- import time
- import pandas as pd
- logging.basicConfig(filename='CSSI Slip Log.log', level=logging.DEBUG)
- class Slip:
- """
- Class for slip creation. Transforms json to text and puts text file in a folder.
- Input:
- <root:str> - path to the cssi_folder
- <slip_path:str> - path + filename of a slip
- """
- @staticmethod
- def inn_check(inn):
- """
- Returns True if inn of the given slip is in the list of the FixPrice inns.
- :param inn:
- :return:
- """
- inn_verified = pd.read_excel('inn_verified.xlsx')
- inn_list = inn_verified['inn'].to_list()
- if int(inn) not in inn_list:
- return False
- return True
- @classmethod
- def check_json(cls, root, json_path):
- """
- Method to see if we can create a slip in the first place. If False then class instance won't appear.
- It checks if json file is empty, if it can be read and verifies inn. Removes slip if inn is wrong.
- :param root:
- :param json_path:
- :return: bool
- """
- logging.info(f'Starting check {json_path}')
- try:
- json_table = pd.read_json(json_path, encoding='utf-8', orient='values')
- except PermissionError:
- pass
- json_table_row = json_table.loc[0, :]
- # try:
- # json_table = pd.read_json(json_path, encoding='utf-8', orient='values')
- # except:
- # logging.warning(f'Error in file {json_path}. Json could not be read')
- # return False
- # try:
- # json_table_row = json_table.loc[0, :]
- # except:
- # logging.warning(f'Error in file {json_path}. This slip was ignored')
- # return False
- if json_table.empty:
- return False
- inn = str(json_table_row['userInn'])
- if cls.inn_check(inn):
- pass
- else:
- os.remove(json_path)
- return False
- return True
- def __init__(self, root, json_path):
- """
- :param root: cssi folder
- :param json_path: path to the slip
- """
- logging.info(f'Reading {json_path}')
- self.root = root
- self.json_path = json_path
- self.json_table = pd.read_json(json_path, encoding='utf-8', orient='values')
- self.item_table = self.get_items(self.json_table)
- self.total = str(self.json_table.iloc[0]['ecashTotalSum'] + self.json_table.iloc[0]['cashTotalSum'])
- self.json_table_row = self.json_table.loc[0, :]
- self.cassa_code = str(self.json_table_row['kktRegId'])
- self.seq_number = str(self.json_table_row['fiscalDocumentNumber'])
- self.date = self.json_table_row['dateTime']
- new_date = [str(self.date.year), '{:0>2}'.format(str(self.date.month)), '{:0>2}'.format(str(self.date.day))]
- time = ['{:0>2}'.format(str(self.date.hour)), '{:0>2}'.format(str(self.date.minute))]
- self.cssi_date = ''.join(new_date)
- self.cssi_time = ''.join(time)
- self.slip_name = f'{self.cassa_code}_{self.seq_number}.txt'
- self.store_code = self.shop_id_from_slip_address()
- def shop_id_from_slip_address(self):
- """
- As input it gets cassa_code and checks with excel file for mapping of cassa to shop_id.
- If 0 is returned it means that no such mapping exists and cssi_person has to add it manually.
- :return: <shop_id:str> - shop_id for session folder creation
- """
- shop_frame = pd.read_excel('cassa_to_store_id.xlsx', encoding='utf-8')
- shop_id = shop_frame.loc[shop_frame['cassa'] == int(self.cassa_code)]
- if shop_id.iloc[0]['store_id'] == 0:
- # is valueerror suitable?
- # TODO: create a windows message with buttons
- # TODO: find appropriate exception
- logging.warning(f'Error in linkage shop_id_from_slip_address')
- raise ValueError
- return str(shop_id.iloc[0]['store_id'])
- @staticmethod
- def get_items(dataframe):
- """
- Extracts a list of jsons from ['items'] column in the original json.
- Each entry is a line with a product.
- :param dataframe: pandas df - json file
- :return: dataframe: pandas df - dataframe with all products from a slip
- """
- items = dataframe.loc[:, ['items']]
- result = pd.DataFrame()
- for index, row in items.iterrows():
- new_json = row.to_json()
- item = pd.read_json(new_json, encoding='utf-8').transpose()
- result = result.append(item)
- return result
- @staticmethod
- def convert_price(price):
- """
- Receives price * 100. Converts to decimal format
- :param price - str
- :return: str - price for text file
- """
- decimal = price[-2:]
- integer = price[:-2]
- return integer + ',' + decimal
- @staticmethod
- def time_in_range(start, end, my_time):
- """
- Check if time is in range.
- :param start: datetime.time format
- :param end: datetime.time format
- :param my_time: datetime.time format
- :return: bool
- """
- if start < my_time <= end:
- return True
- return False
- @classmethod
- def get_session(cls, slip_time):
- """
- Return text for folder name.
- :param slip_time: time on the slip
- :return: str - session time for folder creation
- """
- time_on_slip = datetime.time(slip_time.hour, slip_time.minute, slip_time.second)
- if cls.time_in_range(datetime.time(9, 0, 0), datetime.time(12, 0, 0), time_on_slip):
- return "_0900_1200"
- elif cls.time_in_range(datetime.time(12, 0, 0), datetime.time(15, 0, 0), time_on_slip):
- return "_1200_1500"
- elif cls.time_in_range(datetime.time(15, 0, 0), datetime.time(18, 0, 0), time_on_slip):
- return "_1500_1800"
- elif cls.time_in_range(datetime.time(18, 0, 0), datetime.time(21, 0, 0), time_on_slip):
- return "_1800_2100"
- else:
- return "_out_of_bounds"
- def create_folder(self):
- """
- Check if folder exists and if not it is created
- :return: str - session folder name
- """
- chain_name = 'RUSSIAN-FIXPRICE_'
- session = self.get_session(self.date)
- folder = chain_name + self.store_code + '_' + self.cssi_date + session
- return folder
- def prepare_items(self):
- """
- Formats data on the slip.
- :return: None
- """
- for index, row in self.item_table.iterrows():
- row['name'] = row['name'][:50]
- row['name'] = '{:<50}'.format(row['name'])
- row['quantity'] = int(row['quantity'])
- row['quantity'] = '{:<15}'.format(row['quantity'])
- row['price'] = self.convert_price(str(row['price']))
- row['sum'] = row['price']
- row['price'] = '{:<15}'.format(row['price'])
- def create_txt_slip_head(self):
- """
- Creates a head of the slip.
- :return: None
- """
- # 218515031227_65179
- with open(self.root + f'\\{self.cassa_code}_{self.seq_number}.txt', 'w', encoding='utf-8') as text_file:
- text_file.write('\ufeff')
- text_file.write('Store code: ' + self.store_code + '\n')
- text_file.write('Total: ' + self.convert_price(self.total) + '\n')
- text_file.write('Date: ' + self.cssi_date + '\n')
- text_file.write('Time: ' + self.cssi_time + '\n')
- text_file.write('Cassa code: ' + self.cassa_code + '\n')
- text_file.write('Seq. number: ' + self.seq_number + '\n')
- text_file.write('Image file: test_test_test\n')
- def create_txt_slip_body(self):
- """
- Creates a table with products that is appended to the head.
- :return: None
- """
- items = self.item_table.loc[:, ['name', 'quantity', 'price', 'sum']]
- # print(items.head(5))
- list_items = items.values.tolist()
- with open(self.root + f'\\{self.cassa_code}_{self.seq_number}.txt', 'a', encoding='utf-8') as text_file:
- for row in list_items:
- row.append('\n')
- line = ''.join(row)
- text_file.write(line)
- def create_text_file(self):
- """
- Groupping function for creation.
- :return: None
- """
- self.create_txt_slip_head()
- self.create_txt_slip_body()
- # print('Text file was created')
- class CashSlipManager:
- """
- It has to be a facade for the download manager and json converter.
- Currently it can create a folder and convert json to text.
- """
- # @abstractmethod
- # def download_slips(self, date_start, date_end):
- # # need slip path
- # # date_start = int(pd.to_datetime('2019-07-26 00:00:00').timestamp())
- # # date_end = int(pd.to_datetime('2019-07-31 23:59:59').timestamp())
- # fix_price_email.main(date_start, date_end)
- # # TODO: select path to save (in client class)
- #
- # # TODO: MAIN TODO IS TO REWRITE fix_price_email as a class with function or just functions
- #
- # # TODO: import 1.fix_price and launch main with selected dates
- #
- # # TODO: select dates by hand (date_picker for python)
- # print('All json files from attachments have been downloaded.')
- @staticmethod
- def session_folder_create(root, session_folder, now):
- """
- Creates a folder for upload session in the cssi. All the session folders created will go in here.
- :param root: folder of the cssi program
- :param session_folder - name of the session folder
- :param now: datetime.datetime format - current time
- :return: str - output path
- """
- chain_name = 'RUSSIAN-FIXPRICE_'
- date_now = now
- date_now_date = [str(date_now.year), '{:0>2}'.format(str(date_now.month)), '{:0>2}'.format(str(date_now.day))]
- date_now_time = ['{:0>2}'.format(str(date_now.hour)), '{:0>2}'.format(str(date_now.minute)),
- '{:0>2}'.format(str(date_now.second))]
- date_now_date.extend(date_now_time)
- time_part = ''.join(date_now_date)
- folder = time_part + '_' + chain_name + 'ILYA'
- session_path = root + '\\output\\' + folder
- if os.path.isdir(session_path):
- pass
- else:
- os.mkdir(session_path)
- logging.warning(f'Folder {session_path} was created')
- final_path = session_path + '\\' + session_folder
- if os.path.isdir(final_path):
- return final_path
- else:
- os.mkdir(final_path)
- logging.warning(f'Folder {final_path} was created')
- return final_path
- def json_to_txt(self, root_path, filename, session_time):
- """
- This does the main lifting. It creates a slip with the interface of Slip class. Creates a folder
- and moves all the files. Checks for file existence and can delete files.
- :param root_path:str - file path
- :param filename:str - filename
- :param session_time:datetime.datetime format - current time
- :return: str - fail/success which is used to count errors and number of slips
- """
- slip_path = root_path + '\\' + filename
- if Slip.check_json(root_path, slip_path):
- new_slip = Slip(root_path, slip_path)
- else:
- return 'fail'
- new_slip.prepare_items()
- new_slip.create_text_file()
- new_path = self.session_folder_create(root_path, new_slip.create_folder(), session_time)
- try:
- os.rename(root_path + '\\' + new_slip.slip_name, new_path + '\\' + new_slip.slip_name)
- except FileExistsError:
- # print('Cash slip already exists.')
- os.remove(root_path + '\\' + new_slip.slip_name)
- try:
- os.rename(slip_path, root_path + '\\parsed\\' + filename)
- except FileExistsError:
- # print('Cash slip already exists.')
- os.remove(new_path + '\\' + new_slip.slip_name)
- # except WindowsError:
- # # file not found
- # pass
- return 'success'
- class CssiUser(CashSlipManager):
- """
- Worker class that inherits functions from the SlipManager. This class is a client class and does logging.
- """
- def __init__(self, output_path):
- self.output_path = output_path
- def cssi_input(self):
- slip_number = len(os.listdir(self.output_path))
- count = 0
- errors = 0
- error_files = []
- start = time.perf_counter()
- current_time = datetime.datetime.now()
- for file in os.listdir(self.output_path):
- if self.json_to_txt(self.output_path, file, current_time) == 'success':
- count += 1
- logging.warning(f'Cash slip {count}/{slip_number} was created')
- else:
- errors += 1
- error_files.append(self.output_path + '\\' + file)
- logging.warning(f'Cash slip was not created.')
- logging.warning(f'Total cash slips run: {count}/{slip_number}')
- logging.warning(f'Number of errors in slips: {errors}. Problematic files are:')
- for file in error_files:
- logging.warning(f'{file}')
- end = time.perf_counter()
- logging.warning(f'Total time taken {int(end) - int(start)} seconds')
- def main():
- """
- Launch script
- :return: None
- """
- out = 'C:\\Users\\SaIl9001\\AppData\\Local\\Programs\\Python\\Python37-32\\cssi_python_script\\output'
- user = CssiUser(out)
- user.cssi_input()
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement