Advertisement
Guest User

Untitled

a guest
Mar 6th, 2019
130
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.77 KB | None | 0 0
  1. import sys
  2. import pandas as pd
  3. import numpy as np
  4. from pandas.io import sql
  5. from sqlalchemy import create_engine
  6. from multiprocessing import Pool as pool
  7. from threading import Lock as lock
  8.  
  9. # Get the data from CSV into a DataFrame
  10. df = pd.read_csv('../bases/clientes.csv', sep=';')
  11. df = df.set_index('cpf')
  12.  
  13. # Create batches slicing the df in n parts
  14. def get_batches(iterable, n=1):
  15. l = len(iterable)
  16. for ndx in range(0, l, n):
  17. yield iterable[ndx:min(ndx + n, l)]
  18.  
  19. # DB Connection. In prod, get them from $ENV.
  20. db_name = 'hospital'
  21. tb_name = 'clientes'
  22. db_host = '127.0.0.1'
  23. db_user = 'root'
  24. db_pass = 'password'
  25.  
  26. # Ensure the DB exists
  27. engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/mysql')
  28. with engine.begin() as conn:
  29. q = conn.execute(f'CREATE DATABASE IF NOT EXISTS {db_name}')
  30.  
  31. # Use the new DB as default
  32. engine = create_engine(f'mysql://{db_user}:{db_pass}@{db_host}/{db_name}')
  33.  
  34. # Feed the batches generator and get a list.
  35. # Also implements a locker for a thread-safe counter.
  36. batches = [ b for b in get_batches(df, 100_000) ]
  37. n_batches = len(batches)
  38. counter = 0
  39. counter_lock = lock()
  40.  
  41. # Write progress messages using the locker
  42. def one_more():
  43. global counter
  44. counter += 1
  45. msg = f'\rFinished {counter} of {n_batches} batches ({round(counter / n_batches * 100, 2)}%).'
  46. sys.stdout.write(msg)
  47. sys.stdout.flush()
  48.  
  49. # Insert the data and print the progress.
  50. def insert(df):
  51. with engine.begin() as conn:
  52. df.to_sql(tb_name, conn, if_exists='append', chunksize=10_000)
  53. with counter_lock:
  54. one_more()
  55.  
  56. # Define the threads and run.
  57. THREADS = 4
  58. p = pool(THREADS)
  59. p.map(insert, batches)
  60. print('\nAll Done!')
  61.  
  62. # A little test.
  63. with engine.begin() as conn:
  64. df = pd.read_sql(f'select * from {tb_name} limit 10', conn, index_col='cpf')
  65. df
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement