Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import pandas as pd
- import csv
- import re
- import sqlite3
- # when its runned the second time there will be two [CHECKED], why is this?
- def xlsx_to_csv():
- input_file_name = input('Input file name\n')
- if input_file_name.endswith('.csv'):
- csv_file_name = input_file_name
- else:
- my_df = pd.read_excel(input_file_name, sheet_name='Vehicles', dtype=str)
- # csv_file_name = input_file.removesuffix('.xlsx') + '.csv'
- csv_file_name = input_file_name.replace('.xlsx', '.csv')
- # print(csv_file_name)
- my_df.to_csv(csv_file_name, index=False)
- line_num = my_df.shape[0]
- print(f'{line_num} line{"s were" if line_num > 1 else " was"} imported to {csv_file_name}')
- return csv_file_name
- def clean_csv(csv_file_name):
- #csv_checked_name = csv_file_name.removesuffix('.csv') + '[CHECKED]' + '.csv'
- csv_checked_name = csv_file_name.removesuffix(
- '.csv') + '[CHECKED]' + '.csv' if '[CHECKED]' not in csv_file_name else csv_file_name
- # print('csv_checked_name', csv_checked_name)
- with open(csv_file_name, 'r') as csv_file, open(csv_checked_name, 'w', encoding='utf-8') as checked_file:
- file_reader = csv.reader(csv_file, delimiter=",")
- file_writer = csv.writer(checked_file, delimiter=",", lineterminator="\n")
- count = 0
- wrong_data_count = 0
- for line in file_reader:
- if count == 0:
- file_writer.writerow(line)
- else:
- for cell in line:
- if not cell.isnumeric():
- wrong_data_count += 1
- line[line.index(cell)] = int(re.findall(r'\d+', cell)[0]) # need to replace the cell in line
- file_writer.writerow(line)
- count += 1
- print(f'{wrong_data_count} cells were corrected in {csv_checked_name}')
- # print(csv_checked_name, type(csv_checked_name)) returns string
- # print(checked_file, type(checked_file)) returns io wrapper
- return csv_checked_name
- def csv_to_s3db(cleaned_csv):
- database = cleaned_csv.removesuffix('[CHECKED].csv') + '.s3db'
- # print('database', database)
- conn = sqlite3.connect(database)
- cursor_name = conn.cursor()
- with open(cleaned_csv, 'r') as csv_file:
- file_reader = csv.reader(csv_file, delimiter=",")
- count = 0
- for line in file_reader:
- if count == 0:
- cursor_name.execute('''
- CREATE TABLE IF NOT EXISTS convoy (
- [vehicle_id] INT PRIMARY KEY,
- [engine_capacity] INT NOT NULL,
- [fuel_consumption] INT NOT NULL,
- [maximum_load] INT NOT NULL
- );
- ''')
- else:
- cursor_name.execute('''
- INSERT INTO convoy (vehicle_id, engine_capacity, fuel_consumption, maximum_load)
- VALUES (?,?,?,?);
- ''', line)
- count += 1
- # cursor_name.execute("SELECT * FROM convoy")
- # print('table', cursor_name.fetchall())
- count -= 1 if count > 0 else 0
- conn.commit()
- conn.close()
- print(f'{count} record{"s were" if count > 1 else " was"} inserted into {database}')
- def main():
- file_name = xlsx_to_csv()
- # print('filename', file_name)
- cleaned_csv = clean_csv(file_name)
- # print('cleanned csv', cleaned_csv)
- csv_to_s3db(cleaned_csv)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement