Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- #encoding=utf-8
- """
- A lightweight wrapper around mysql.connector based on torndb.
- """
- from __future__ import absolute_import, division, with_statement
- import logging
- import time
- from mysql import connector
- class Connection(object):
- """A lightweight wrapper around mysql.connector DB-API connections.
- The main value we provide is wrapping rows in a dict/object so that
- columns can be accessed by name. Typical usage::
- db = Connection("localhost", "mydatabase")
- for article in db.query("SELECT * FROM articles"):
- print article.title
- """
- def __init__(self, database, host="127.0.0.1:3306", user=None, password=None,
- max_idle_time=7 * 3600, autocommit=True, **kwargs):
- self.host = host
- self.database = database
- self.max_idle_time = float(max_idle_time)
- args = dict(database=database, autocommit=autocommit, **kwargs)
- if user is not None:
- args["user"] = user
- if password is not None:
- args["password"] = password
- # We accept a path to a MySQL socket file or a host(:port) string
- if "/" in host:
- args["unix_socket"] = host
- else:
- self.socket = None
- pair = host.split(":")
- if len(pair) == 2:
- args["host"] = pair[0]
- args["port"] = int(pair[1])
- else:
- args["host"] = host
- args["port"] = 3306
- self._db = None
- self._db_args = args
- self._last_use_time = time.time()
- try:
- self.reconnect()
- except Exception:
- logging.error("Cannot connect to MySQL on %s", self.host,
- exc_info=True)
- @property
- def cursor(self):
- return self._cursor()
- def close(self):
- """Closes this database connection."""
- if getattr(self, "_db", None) is not None:
- self._db.close()
- self._db = None
- def reconnect(self):
- """Closes the existing database connection and re-opens it."""
- self.close()
- self._db = connector.connect(**self._db_args)
- def iter(self, query, *parameters, **kwparameters):
- """Returns an iterator for the given query and parameters."""
- self._ensure_connected()
- cursor = self._cursor(buffered=True)
- try:
- self._execute(cursor, query, parameters, kwparameters)
- column_names = [d[0] for d in cursor.description]
- for row in cursor:
- yield Row(zip(column_names, row))
- finally:
- cursor.close()
- def query(self, query, *parameters, **kwparameters):
- """Returns a row list for the given query and parameters."""
- cursor = self._cursor()
- try:
- self._execute(cursor, query, parameters, kwparameters)
- column_names = [d[0] for d in cursor.description]
- return [Row(zip(column_names, row)) for row in cursor]
- finally:
- cursor.close()
- def get(self, query, *parameters, **kwparameters):
- """Returns the (singular) row returned by the given query.
- If the query has no results, returns None. If it has
- more than one result, raises an exception.
- """
- rows = self.query(query, *parameters, **kwparameters)
- if not rows:
- return None
- elif len(rows) > 1:
- raise Exception("Multiple rows returned for Database.get() query")
- else:
- return rows[0]
- # rowcount is a more reasonable default return value than lastrowid,
- # but for historical compatibility execute() must return lastrowid.
- def execute(self, query, *parameters, **kwparameters):
- """Executes the given query, returning the lastrowid from the query."""
- return self.execute_lastrowid(query, *parameters, **kwparameters)
- def execute_lastrowid(self, query, *parameters, **kwparameters):
- """Executes the given query, returning the lastrowid from the query."""
- cursor = self._cursor()
- try:
- self._execute(cursor, query, parameters, kwparameters)
- return cursor.lastrowid
- finally:
- cursor.close()
- def execute_rowcount(self, query, *parameters, **kwparameters):
- """Executes the given query, returning the rowcount from the query."""
- cursor = self._cursor()
- try:
- self._execute(cursor, query, parameters, kwparameters)
- return cursor.rowcount
- finally:
- cursor.close()
- def executemany(self, query, parameters):
- """Executes the given query against all the given param sequences.
- We return the lastrowid from the query.
- """
- return self.executemany_lastrowid(query, parameters)
- def executemany_lastrowid(self, query, parameters):
- """Executes the given query against all the given param sequences.
- We return the lastrowid from the query.
- """
- cursor = self._cursor()
- try:
- cursor.executemany(query, parameters)
- return cursor.lastrowid
- finally:
- cursor.close()
- def executemany_rowcount(self, query, parameters):
- """Executes the given query against all the given param sequences.
- We return the rowcount from the query.
- """
- cursor = self._cursor()
- try:
- cursor.executemany(query, parameters)
- return cursor.rowcount
- finally:
- cursor.close()
- update = delete = execute_rowcount
- updatemany = executemany_rowcount
- insert = execute_lastrowid
- insertmany = executemany_lastrowid
- def _ensure_connected(self):
- # Mysql by default closes client connections that are idle for
- # 8 hours, but the client library does not report this fact until
- # you try to perform a query and it fails. Protect against this
- # case by preemptively closing and reopening the connection
- # if it has been idle for too long (7 hours by default).
- if (self._db is None or
- (time.time() - self._last_use_time > self.max_idle_time)):
- self.reconnect()
- self._last_use_time = time.time()
- def _cursor(self, buffered=False):
- self._ensure_connected()
- return self._db.cursor(buffered=buffered)
- def _execute(self, cursor, query, parameters, kwparameters):
- try:
- return cursor.execute(query, kwparameters or parameters)
- except connector.errors.OperationalError:
- logging.error("Error connecting to MySQL on %s", self.host)
- self.close()
- raise
- def __del__(self):
- self.close()
- class Row(dict):
- """A dict that allows for object-like property access syntax."""
- def __getattr__(self, name):
- try:
- return self[name]
- except KeyError:
- raise AttributeError(name)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement