Guest User

Untitled

a guest
Aug 13th, 2018
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.29 KB | None | 0 0
  1. from dask.dataframe import read_sql_table
  2. import pandas as pd
  3. import numpy as np
  4. from sqlalchemy import create_engine, schema
  5. from config import database_config
  6.  
  7. # Copied from pandas with modifications
  8.  
  9.  
  10. def _get_dtype(column, sqltype):
  11. from sqlalchemy.types import (Integer, Float, Boolean, DateTime,
  12. Date, TIMESTAMP)
  13.  
  14. if isinstance(sqltype, Float):
  15. return float
  16. elif isinstance(sqltype, Integer):
  17. if column.nullable:
  18. return float
  19. # TODO: Refine integer size.
  20. return np.dtype('int64')
  21. elif isinstance(sqltype, TIMESTAMP):
  22. # we have a timezone capable type
  23. if not sqltype.timezone:
  24. return np.dtype('datetime64[ns]')
  25. return DatetimeTZDtype
  26. elif isinstance(sqltype, DateTime):
  27. # Caution: np.datetime64 is also a subclass of np.number.
  28. return np.dtype('datetime64[ns]')
  29. elif isinstance(sqltype, Date):
  30. return np.date
  31. elif isinstance(sqltype, Boolean):
  32. return bool
  33. return object
  34.  
  35.  
  36. def database_table_request(db_type: str, db_server: str, database: str, table: str, index_col: str = None, npartitions: int = 1):
  37. db_engine = database_config.engine(db_type)
  38. db_username = database_config.username(db_type)
  39. db_password = database_config.password(db_type)
  40.  
  41. # Get database schema using sqlalchemy reflection
  42. db_uri = f'{db_engine}://{db_username}:{db_password}@{db_server}/{database}'
  43. db_engine = create_engine(db_uri)
  44. db_metadata = schema.MetaData(bind=db_engine, reflect=True)
  45. db_tables = {k.lower(): v for k, v in db_metadata.tables.items()}
  46. db_table = db_tables[table.lower()]
  47.  
  48. # Identify the PK if it hasn't been passed
  49. for column in db_table.columns:
  50. if column.primary_key and index_col == None:
  51. index_col = column.name
  52.  
  53. # Now that we have a PK name, create an empty pandas DataFrame
  54. # for Dask meta argument
  55. pd_df = pd.DataFrame(index=None)
  56. for column in db_table.columns:
  57. if not column.name == index_col:
  58. pd_df[column.name] = pd.Series(
  59. dtype=_get_dtype(column, column.type))
  60.  
  61. # Execute query here
  62. df = read_sql_table(db_table.name, db_uri, index_col,
  63. meta=pd_df, npartitions=npartitions)
  64.  
  65. # Return dataframe
  66. return df
Add Comment
Please, Sign In to add comment