Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import pandas as pd
- import numpy as np
- from pandas.io import sql
- from sqlalchemy import create_engine
- from multiprocessing import Pool as pool
- from threading import Lock as lock
- # Get the data from CSV into a DataFrame
- df = pd.read_csv('../bases/clientes.csv', sep=';')
- df = df.set_index('cpf')
- # Create batches slicing the df in n parts
- def get_batches(iterable, n=1):
- l = len(iterable)
- for ndx in range(0, l, n):
- yield iterable[ndx:min(ndx + n, l)]
- # DB Connection. In prod, get them from $ENV.
- db_name = 'hospital'
- tb_name = 'clientes'
- db_host = '127.0.0.1'
- db_user = 'root'
- db_pass = 'password'
- # Ensure the DB exists
- engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/mysql')
- with engine.begin() as conn:
- q = conn.execute(f'CREATE DATABASE IF NOT EXISTS {db_name}')
- # Use the new DB as default
- engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/{db_name}')
- # Feed the batches generator and get a list.
- # Also implements a locker for a thread-safe counter.
- batches = [ b for b in get_batches(df, 100_000) ]
- n_batches = len(batches)
- counter = 0
- counter_lock = lock()
- # Write progress messages using the locker
- def one_more():
- global counter
- counter += 1
- msg = f'\rFinished {counter} of {n_batches} batches ({round(counter / n_batches * 100, 2)}%).'
- sys.stdout.write(msg)
- sys.stdout.flush()
- # Insert the data and print the progress.
- def insert(df):
- with engine.begin() as conn:
- df.to_sql(tb_name, conn, if_exists='append', chunksize=10_000)
- with counter_lock:
- one_more()
- # Define the threads and run.
- THREADS = 4
- p = pool(THREADS)
- p.map(insert, batches)
- print('\nAll Done!')
- # A little test.
- with engine.begin() as conn:
- df = pd.read_sql(f'select * from {tb_name} limit 10', conn, index_col='cpf')
- df
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement