Guest User

Untitled

a guest
Sep 13th, 2018
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.10 KB | None | 0 0
  1. import logging
  2.  
  3. from sqlalchemy import create_engine, MetaData, Table
  4. from sqlalchemy.engine import reflection
  5. from sqlalchemy.orm import mapper, sessionmaker
  6.  
  7. logger = logging.getLogger(__name__)
  8.  
  9.  
  10. def get_session(host=None, port=3306, db=None,
  11. user=None, password=None, charset='utf8mb4'):
  12. """
  13. initial mapping with class and db tables
  14. :param host: mysql db host
  15. :param port: mysql db port
  16. :param db: database name
  17. :param user: user name
  18. :param password: user password
  19. :param charset: charset
  20. :return: DBase class with session attribute and table attributes
  21. """
  22. engine = create_engine(
  23. 'mysql+pymysql://%s:%s@%s:%d/%s?charset=%s' %
  24. (user, password, host, port, db, charset))
  25. metadata = MetaData(engine)
  26. session = sessionmaker(bind=engine)()
  27. insp = reflection.Inspector.from_engine(engine)
  28. logger.info("Connect to DB: %s@%s" % (db, host))
  29.  
  30. # create table class for mapper, then map table to class
  31. classes = dict()
  32. for t in insp.get_table_names():
  33. table = Table(t, metadata, autoload=True)
  34. new_class = type(
  35. ''.join([str(t), '_', db]),
  36. (Model,), {
  37. 't': table,
  38. 's': session,
  39. 'insp': insp
  40. })
  41. mapper(new_class, table)
  42.  
  43. instance = new_class()
  44. classes[t] = instance
  45. logger.debug("Map table %s to %s" % (t, new_class))
  46.  
  47. db_session = DBase(engine, session, **classes)
  48. return db_session
  49.  
  50.  
  51. class DBase(object):
  52. def __init__(self, engine, session, **kwargs):
  53. self.engine = engine
  54. self.session = session
  55. for key, val in kwargs.items():
  56. setattr(self, key, val)
  57.  
  58. def rollback(self):
  59. self.session.rollback()
  60.  
  61. def close(self):
  62. self.session.close()
  63.  
  64. def execute(self, sql):
  65. try:
  66. with self.engine.connect() as con:
  67. return con.execute(sql)
  68. except:
  69. self.rollback()
  70. return None
  71.  
  72.  
  73. class Model(object):
  74. def __init__(self, **kwargs):
  75. for key, val in kwargs.items():
  76. setattr(self, key, val)
  77. if self.t.name:
  78. self.objects = QuerySet(
  79. self.__class__, self.t.name, self.s, self.insp)
  80.  
  81.  
  82. class QuerySet(object):
  83. def __init__(self, model, name, session, inspector):
  84. self.s = session
  85. self.insp = inspector
  86. self.table_name = name
  87. self.model = model
  88. self.logger = logging.getLogger(__name__)
  89.  
  90. def get(self, **kwargs):
  91. return self.s.query(self.model).filter_by(**kwargs).first()
  92.  
  93. def filter(self, **kwargs):
  94. return self.s.query(self.model).filter_by(**kwargs)
  95.  
  96. def get_or_create(self, default={}, **kwargs):
  97. """
  98. :param default: additional attributes if create
  99. :param kwargs: attributes for query existence
  100. :return: instance
  101. """
  102. instance = self.get(**kwargs)
  103. if instance is None:
  104. return self.create(**kwargs, **default), True
  105. return instance, False
  106.  
  107. def create(self, **kwargs):
  108. instance = self.model(**kwargs)
  109. self.s.add(instance)
  110. self.save()
  111. return instance
  112.  
  113. def all(self):
  114. return self.s.query(self.model).all()
  115.  
  116. def save(self):
  117. try:
  118. self.s.commit()
  119. except:
  120. self.s.rollback()
  121. raise ValueError('can not save instance')
  122.  
  123. def delete(self, **kwargs):
  124. instance = self.model(**kwargs)
  125. self.s.delete(instance)
  126. self.save()
  127.  
  128.  
  129. if __name__ == '__main__':
  130. db = get_session(host='localhost', db='db', user='root', password="xxx")
  131.  
  132. # create a user
  133. db.auth_user.objects.create(username='admin')
  134.  
  135. # get one instance
  136. print(db.auth_user.objects.get(username='admin').id)
  137.  
  138. # filter query
  139. for u in db.auth_user.objects.filter(is_active=1):
  140. print(u.username)
  141.  
  142. # get or create a user
  143. user, _ = db.auth_user.objects.get_or_create(username='admin')
  144.  
  145. # update fields
  146. user.first_name = 'superman'
  147. db.auth_user.objects.save()
  148. print(db.auth_user.objects.get(username='admin').first_name)
Add Comment
Please, Sign In to add comment