Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sqlalchemy.ext.automap import automap_base
- from sqlalchemy.orm import Session
- from sqlalchemy import create_engine, MetaData
- from collections import namedtuple
- import hashlib
- import os
- import pickle
- __ALL__ = ['describe', 'Connection', 'Database']
- Connection = namedtuple('Connection', ['host', 'port', 'db', 'user', 'password'])
- def describe(obj, cols=None):
- ret = {}
- reserved = ['metadata', 'prepare']
- if cols is None:
- for prop in dir(obj):
- if prop.startswith('_') or prop in reserved:
- continue
- ret[prop] = getattr(obj, prop)
- return ret
- class Models:
- def __init__(self, classes):
- self._klasses = []
- for name, klass in classes:
- self._klasses.append(klass)
- setattr(self, name, klass)
- def __iter__(self):
- yield from self._klasses
- class Database:
- __VERSION__ = '0.1'
- _CACHE_DIR = os.path.expanduser('~/.dbconnect')
- @classmethod
- def auto_discover(cls, index, **kwargs):
- return cls(*cls._get_connections()[index], **kwargs)
- @classmethod
- def _get_connections(cls):
- connections = []
- with open(os.path.expanduser('~/.pgpass')) as pgpassfile:
- for line in pgpassfile:
- cleaned = line.strip()
- connections.append(Connection(*cleaned.split(':')))
- return connections
- @classmethod
- def _get_hash(cls, s):
- return hashlib.sha1(''.join((s, cls.__VERSION__)).encode('utf-8')).hexdigest()
- def _get_cached_tables(self, connection_hash):
- try:
- with open(os.path.join(self._CACHE_DIR, connection_hash), 'rb') as f:
- return pickle.load(f)
- except (FileNotFoundError, EOFError):
- return None
- def _cache_tables(self, connection_hash, tables):
- with open(os.path.join(self._CACHE_DIR, connection_hash), 'wb') as f:
- f.write(pickle.dumps(tables))
- def __init__(self, host, port, db, user, password, debug=False, cached=True):
- # pre-connection stuff
- self._setup_cache_dir()
- self.debug = debug
- self.db = 'postgres://{user}:{password}@{host}:{port}/{db}'.format(
- user=user, password=password, host=host, port=port, db=db)
- connection_hash = self._get_hash(self.db)
- if cached is True:
- cached_tables = self._get_cached_tables(connection_hash)
- if cached_tables:
- session, models = self._init_existing(cached_tables)
- else:
- # duplicate code
- session, models = self._init_automap()
- self._cache_tables(connection_hash, [m.__table__ for m in models])
- else:
- # duplicate code
- session, models = self._init_automap()
- self._cache_tables(connection_hash, [m.__table__ for m in models])
- self.s, self.m = session, models
- def _init_automap(self):
- """Use sqlalchemy automap to reflect tables
- """
- engine = create_engine(self.db, isolation_level="AUTOCOMMIT", echo=self.debug)
- base = automap_base()
- base.prepare(engine, reflect=True)
- return Session(engine), Models(base.classes.items())
- def _init_existing(self, tables):
- """Use pickled model tables for sqlalchemy
- """
- engine = create_engine(self.db, isolation_level="AUTOCOMMIT", echo=self.debug)
- metadata = MetaData()
- _tables = [t.tometadata(metadata) for t in tables] # noqa
- base = automap_base(metadata=metadata)
- base.prepare()
- return Session(engine), Models(base.classes.items())
- def _setup_cache_dir(self):
- """Creates table cache directory if it doesn't already exist
- """
- try:
- os.mkdir(self._CACHE_DIR)
- except FileExistsError:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement