Advertisement
Guest User

Untitled

a guest
Jun 30th, 2017
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.65 KB | None | 0 0
  1. import asyncio
  2.  
  3. import aiomysql
  4. from aiomysql.cursors import DictCursor
  5.  
  6.  
  7. class MysqlWrapper:
  8. def __init__(self, host, port, user, password, db):
  9. self.host = host
  10. self.port = port
  11. self.user = user
  12. self.password = password
  13. self.db = db
  14. self._pool = None
  15.  
  16. async def pool(self):
  17. if not self._pool:
  18. self._pool = await aiomysql.create_pool(
  19. host=self.host, port=self.port, user=self.user,
  20. password=self.password, db=self.db
  21. )
  22. return self._pool
  23.  
  24. async def close(self):
  25. if not self._pool:
  26. return
  27. self._pool.close()
  28. await self._pool.wait_closed()
  29.  
  30. mysql = MysqlWrapper(
  31. host='localhost', port=3306, user='root',
  32. password='', db='mysql'
  33. )
  34.  
  35.  
  36. def mysql_context(wrapper):
  37. def _mysql_context(func):
  38. async def __mysql_context(*args, **kwargs):
  39. pool = await wrapper.pool()
  40. async with pool.acquire() as conn:
  41. await conn.set_charset('utf8')
  42. r = await func(conn=conn, *args, **kwargs)
  43. await conn.commit()
  44. return r
  45. return __mysql_context
  46. return _mysql_context
  47.  
  48.  
  49. @mysql_context(mysql)
  50. async def mysql_test(conn=None):
  51. async with conn.cursor(DictCursor) as cur:
  52. await cur.execute('SELECT Host,User FROM user')
  53. print(cur.rowcount)
  54. print(await cur.fetchone())
  55. print(await cur.fetchall())
  56.  
  57.  
  58. async def close_pool():
  59. await mysql.close()
  60. print('Close mysql pool')
  61.  
  62.  
  63. loop = asyncio.get_event_loop()
  64. loop.run_until_complete(mysql_test())
  65. loop.run_until_complete(close_pool())
  66. loop.close()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement