MeowalsoMeow

convoy shipping stage 3(passed)

Sep 19th, 2022
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.10 KB | None | 0 0
  1. import pandas as pd
  2. import csv
  3. import re
  4. import sqlite3
  5. #import tempfile
  6. from tempfile import NamedTemporaryFile
  7. import shutil
  8. import os
  9.  
  10.  
  11. # when its runned the second time there will be two [CHECKED]
  12.  
  13. # When writing make sure to add 'delimiter=",", lineterminator="\n"' to the writer or there will be empty lines
  14. # use str.isdigit() not isinstance to cover ie: '5'
  15. def xlsx_to_csv():
  16. input_file_name = input('Input file name\n')
  17. if input_file_name.endswith('.csv'):
  18. csv_file_name = input_file_name
  19. else:
  20. my_df = pd.read_excel(input_file_name, sheet_name='Vehicles', dtype=str)
  21. # csv_file_name = input_file.removesuffix('.xlsx') + '.csv'
  22. csv_file_name = input_file_name.replace('.xlsx', '.csv')
  23. # print(csv_file_name)
  24. my_df.to_csv(csv_file_name, index=False)
  25. line_num = my_df.shape[0]
  26. print(f'{line_num} line{"s were" if line_num > 1 else " was"} imported to {csv_file_name}')
  27. return csv_file_name
  28.  
  29.  
  30. def clean_csv(csv_file_name):
  31. #csv_checked_name = csv_file_name.removesuffix('.csv') + '[CHECKED]' + '.csv'
  32. csv_checked_name = csv_file_name.removesuffix(
  33. '.csv') + '[CHECKED]' + '.csv' if '[CHECKED]' not in csv_file_name else csv_file_name
  34. # print('csv_checked_name', csv_checked_name)
  35. checked_file = NamedTemporaryFile('w', encoding='utf-8', delete=False)
  36. #with open(csv_file_name, 'r+') as csv_file, tempfile.NamedTemporaryFile('w', encoding='utf-8', delete=False) as checked_file:
  37. with open(csv_file_name, 'r+') as csv_file, checked_file:
  38. file_reader = csv.reader(csv_file, delimiter=",")
  39. file_writer = csv.writer(checked_file, delimiter=",", lineterminator="\n")
  40. count = 0
  41. wrong_data_count = 0
  42. for line in file_reader:
  43. if count == 0:
  44. file_writer.writerow(line)
  45. else:
  46. for cell in line:
  47. if not cell.isnumeric():
  48. wrong_data_count += 1
  49. line[line.index(cell)] = int(re.findall(r'\d+', cell)[0]) # need to replace the cell in line
  50. file_writer.writerow(line)
  51. count += 1
  52. shutil.move(checked_file.name, csv_file_name)
  53. try:
  54. os.rename(csv_file_name, csv_checked_name)
  55. except WindowsError:
  56. os.remove(csv_checked_name)
  57. os.rename(csv_file_name, csv_checked_name)
  58.  
  59. print(f'{wrong_data_count} cells were corrected in {csv_checked_name}')
  60. # print(csv_checked_name, type(csv_checked_name)) returns string
  61. # print(checked_file, type(checked_file)) returns io wrapper
  62. return csv_checked_name
  63.  
  64.  
  65. def csv_to_s3db(cleaned_csv):
  66. database = cleaned_csv.removesuffix('[CHECKED].csv') + '.s3db'
  67. # print('database', database)
  68. conn = sqlite3.connect(database)
  69. cursor_name = conn.cursor()
  70. with open(cleaned_csv, 'r') as csv_file:
  71. file_reader = csv.reader(csv_file, delimiter=",")
  72. count = 0
  73. for line in file_reader:
  74. if count == 0:
  75. cursor_name.execute('''
  76. CREATE TABLE IF NOT EXISTS convoy (
  77. [vehicle_id] INT PRIMARY KEY,
  78. [engine_capacity] INT NOT NULL,
  79. [fuel_consumption] INT NOT NULL,
  80. [maximum_load] INT NOT NULL
  81. );
  82. ''')
  83. else:
  84. cursor_name.execute('''
  85. INSERT INTO convoy (vehicle_id, engine_capacity, fuel_consumption, maximum_load)
  86. VALUES (?,?,?,?);
  87. ''', line)
  88. count += 1
  89. # cursor_name.execute("SELECT * FROM convoy")
  90. # print('table', cursor_name.fetchall())
  91. count -= 1 if count > 0 else 0
  92. conn.commit()
  93. conn.close()
  94. print(f'{count} record{"s were" if count > 1 else " was"} inserted into {database}')
  95.  
  96.  
  97. def main():
  98. file_name = xlsx_to_csv()
  99. # print('filename', file_name)
  100. cleaned_csv = clean_csv(file_name)
  101. # print('cleanned csv', cleaned_csv)
  102. csv_to_s3db(cleaned_csv)
  103.  
  104.  
  105. if __name__ == '__main__':
  106. main()
  107.  
Advertisement
Add Comment
Please, Sign In to add comment