Advertisement
EelcoHoogendoorn

shelve2: persistent object-to-object mapping

Feb 22nd, 2014
141
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 14.18 KB | None | 0 0
  1.  
  2. """
  3. shelve2.py, by Eelco Hoogendoorn
  4.  
  5. this module provides two generalizations over pythons shelve
  6. first of all, it efficiently handles arbitrary (pickable) python objects as keys
  7. secondly, it aims to be entirely process and thread safe.
  8. (note; im not 100% sure this latter goal has entirely been achieved yet)
  9.  
  10. the design is an sqlite3 database, with a table containing key and value blobs,
  11. which are indexed by a hash of the given key
  12. this allows for efficient mapping of complex python objects to complex python objects
  13.  
  14. This code is adapted from the sqlitedict code from author below
  15. """
  16.  
  17. # Copyright (C) 2011 Radim Rehurek <radimrehurek@seznam.cz>
  18.  
  19. # Hacked together from:
  20. #  * http://code.activestate.com/recipes/576638-draft-for-an-sqlite3-based-dbm/
  21. #  * http://code.activestate.com/recipes/526618/
  22. #
  23. # Use the code in any way you like (at your own risk), it's public domain.
  24.  
  25. """
  26. A lightweight wrapper around Python's sqlite3 database, with a dict-like interface
  27. and multi-thread access support::
  28.  
  29. >>> mydict = SqliteDict('some.db', autocommit=True) # the mapping will be persisted to file `some.db`
  30. >>> mydict['some_key'] = any_picklable_object
  31. >>> print mydict['some_key']
  32. >>> print len(mydict) # etc... all dict functions work
  33.  
  34. Pickle is used internally to serialize the values. Keys are strings.
  35.  
  36. If you don't use autocommit (default is no autocommit for performance), then
  37. don't forget to call `mydict.commit()` when done with a transaction.
  38.  
  39. """
  40.  
  41.  
  42. import sqlite3
  43. import os
  44. import tempfile
  45. import random
  46. import logging
  47. from cPickle import dumps, loads, HIGHEST_PROTOCOL as PICKLE_PROTOCOL
  48. from UserDict import DictMixin
  49. from Queue import Queue
  50. from threading import Thread
  51.  
  52. import numpy as np
  53. import hashlib
  54.  
  55.  
  56. logger = logging.getLogger('sqlitedict')
  57.  
  58.  
  59.  
  60. def open(*args, **kwargs):
  61.     """See documentation of the SqlDict class."""
  62.     return SqliteDict(*args, **kwargs)
  63.  
  64.  
  65. def encode(obj):
  66.     """Serialize an object using pickle to a binary format accepted by SQLite."""
  67.     return sqlite3.Binary(dumps(obj, protocol=PICKLE_PROTOCOL))
  68.  
  69.  
  70. def decode(obj):
  71.     """Deserialize objects retrieved from SQLite."""
  72.     return loads(str(obj))
  73.  
  74. def hash_str_to_u64(strobj):
  75.     return reduce(np.bitwise_xor, np.frombuffer(hashlib.sha256(strobj).digest(), dtype=np.uint64)) + 1
  76.  
  77. def process_key(key):
  78.     keystr = encode(key)
  79.     keyhash = hash_str_to_u64(keystr)
  80.     return keystr, keyhash
  81.  
  82.  
  83. class Shelve(object, DictMixin):
  84.     def __init__(self, filename=None, flag='c',
  85.                  autocommit=False, journal_mode="DELETE"):
  86.         """
  87.        Initialize a thread-safe sqlite-backed dictionary. The dictionary will
  88.        be a table `tablename` in database file `filename`. A single file (=database)
  89.        may contain multiple tables.
  90.  
  91.        If no `filename` is given, a random file in temp will be used (and deleted
  92.        from temp once the dict is closed/deleted).
  93.  
  94.        If you enable `autocommit`, changes will be committed after each operation
  95.        (more inefficient but safer). Otherwise, changes are committed on `self.commit()`,
  96.        `self.clear()` and `self.close()`.
  97.  
  98.        Set `journal_mode` to 'OFF' if you're experiencing sqlite I/O problems
  99.        or if you need performance and don't care about crash-consistency.
  100.  
  101.        The `flag` parameter:
  102.          'c': default mode, open for read/write, creating the db/table if necessary.
  103.          'w': open for r/w, but drop `tablename` contents first (start with empty table)
  104.          'n': create a new database (erasing any existing tables, not just `tablename`!).
  105.  
  106.        """
  107.         if flag == 'n':
  108.             if os.path.exists(filename):
  109.                 os.remove(filename)
  110.  
  111.         self.filename = filename
  112.  
  113. ##        logger.info("opening Sqlite table %r in %s" % (tablename, filename))
  114.         self.conn = SqliteMultithread(filename, autocommit=autocommit, journal_mode=journal_mode)
  115.  
  116. ##        MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS dict (hash INTEGER PRIMARY KEY, key BLOB, value BLOB)'
  117.         MAKE_TABLE = 'CREATE TABLE IF NOT EXISTS dict (hash INT NOT NULL, key BLOB, value BLOB)'
  118.         self.conn.execute(MAKE_TABLE)
  119.         MAKE_TABLE = 'CREATE INDEX IF NOT EXISTS `id` ON `dict` (`hash` ASC)'
  120.         self.conn.execute(MAKE_TABLE)
  121.         self.conn.commit()
  122.  
  123.         if flag == 'w':
  124.             self.clear()
  125.  
  126.     def __str__(self):
  127. #        return "SqliteDict(%i items in %s)" % (len(self), self.conn.filename)
  128.         return "SqliteDict(%s)" % (self.conn.filename)
  129.  
  130.     def __len__(self):
  131.         # `select count (*)` is super slow in sqlite (does a linear scan!!)
  132.         # As a result, len() is very slow too once the table size grows beyond trivial.
  133.         # We could keep the total count of rows ourselves, by means of triggers,
  134.         # but that seems too complicated and would slow down normal operation
  135.         # (insert/delete etc).
  136.         GET_LEN = 'SELECT COUNT(*) FROM dict'
  137.         rows = self.conn.select_one(GET_LEN)[0]
  138.         return rows if rows is not None else 0
  139.  
  140.     def __bool__(self):
  141.         GET_LEN = 'SELECT MAX(ROWID) FROM dict'
  142.         return self.conn.select_one(GET_LEN) is not None
  143.  
  144.     def iterkeys(self):
  145.         GET_KEYS = 'SELECT key FROM dict ORDER BY rowid'
  146.         for key in self.conn.select(GET_KEYS):
  147.             yield decode(key[0])
  148.  
  149.     def itervalues(self):
  150.         GET_VALUES = 'SELECT value FROM dict ORDER BY rowid'
  151.         for value in self.conn.select(GET_VALUES):
  152.             yield decode(value[0])
  153.  
  154.     def iteritems(self):
  155.         GET_ITEMS = 'SELECT key, value FROM dict ORDER BY rowid'
  156.         for key, value in self.conn.select(GET_ITEMS):
  157.             yield decode(key), decode(value)
  158.  
  159.  
  160.     def getrowid(self, key, keystr, keyhash):
  161.         GET_ITEM = 'SELECT rowid, key FROM dict WHERE hash = ?'
  162.         keys = self.conn.select(GET_ITEM, (keyhash,))
  163.         for rowid, storedkey in keys:
  164.             if storedkey == keystr:
  165.                 return rowid
  166.         raise KeyError(key)
  167.  
  168.     def __contains__(self, key):
  169.         try:
  170.             self.getrowid(key, *process_key(key))
  171.             return True
  172.         except:
  173.             return False
  174.  
  175.     def __getitem__(self, key):
  176.         return self.getitem(key, *process_key(key))
  177.     def getitem(self, key, keystr, keyhash):
  178.         GET_ITEM = 'SELECT key, value FROM dict WHERE hash = ?'
  179.         items = self.conn.select(GET_ITEM, (keyhash,))
  180.         if items is None:
  181.             raise KeyError(key)
  182.         for storedkey, value in items:
  183.             if keystr == storedkey:
  184.                 return decode(value)
  185.         raise KeyError(key)
  186.  
  187.     def __setitem__(self, key, value):
  188.         return self.setitem(key, value, *process_key(key))
  189.     def setitem(self, key, value, keystr, keyhash):
  190.         valuestr = encode(value)
  191.         try:
  192.             rowid = self.getrowid(key, keystr, keyhash)
  193.             ADD_ITEM = 'REPLACE INTO dict (rowid, hash, key, value) VALUES (?,?,?,?)'
  194.             self.conn.execute(ADD_ITEM, (rowid, keyhash, keystr, valuestr))
  195.         except:
  196.             ADD_ITEM = 'INSERT INTO dict (hash, key, value) VALUES (?,?,?)'
  197.             self.conn.execute(ADD_ITEM, (keyhash, keystr, valuestr))
  198.  
  199.     def __delitem__(self, key):
  200.         self.delitem(key, *process_key(key))
  201.     def delitem(self, key, keystr, keyhash):
  202.         rowid = self.getrowid(key, keystr, keyhash)
  203.         DEL_ITEM = 'DELETE FROM dict WHERE rowid = ?'
  204.         self.conn.execute(DEL_ITEM, (rowid,))
  205.  
  206.  
  207.     def update(self, items=(), **kwds):
  208.         def dummy(item):
  209.             key, value = item
  210.             keystr = encode(key)
  211.             keyhash = hash_str_to_u64(keystr)
  212.             valuestr = encode(value)
  213.             return keyhash, keystr, valuestr
  214.  
  215.         try:
  216.             items = map(dummy, items.iteritems())
  217.         except AttributeError:
  218.             pass
  219.  
  220.         UPDATE_ITEMS = 'REPLACE INTO dict (hash, key, value) VALUES (?, ?, ?)'
  221.         self.conn.executemany(UPDATE_ITEMS, items)
  222.         if kwds:
  223.             self.update(kwds)
  224.  
  225.     def keys(self):
  226.         return list(self.iterkeys())
  227.  
  228.     def values(self):
  229.         return list(self.itervalues())
  230.  
  231.     def items(self):
  232.         return list(self.iteritems())
  233.  
  234.     def __iter__(self):
  235.         return self.iterkeys()
  236.  
  237.     def clear(self):
  238.         CLEAR_ALL = 'DELETE FROM dict;' # avoid VACUUM, as it gives "OperationalError: database schema has changed"
  239.         self.conn.commit()
  240.         self.conn.execute(CLEAR_ALL)
  241.         self.conn.commit()
  242.  
  243.     def commit(self):
  244.         if self.conn is not None:
  245.             self.conn.commit()
  246.     sync = commit
  247.  
  248.     def close(self):
  249.         logger.debug("closing %s" % self)
  250.         if self.conn is not None:
  251.             if self.conn.autocommit:
  252.                 self.conn.commit()
  253.             self.conn.close()
  254.             self.conn = None
  255.  
  256.     def terminate(self):
  257.         """Delete the underlying database file. Use with care."""
  258.         self.close()
  259.         logger.info("deleting %s" % self.filename)
  260.         try:
  261.             os.remove(self.filename)
  262.         except IOError, e:
  263.             logger.warning("failed to delete %s: %s" % (self.filename, e))
  264.  
  265.     def __del__(self):
  266.         # like close(), but assume globals are gone by now (such as the logger)
  267.         try:
  268.             if self.conn is not None:
  269.                 if self.conn.autocommit:
  270.                     self.conn.conn.commit()
  271.                 self.conn.conn.close()
  272.                 self.conn = None
  273.         except:
  274.             pass
  275. #endclass SqliteDict
  276.  
  277.  
  278.  
  279. class SqliteMultithread(Thread):
  280.     """
  281.    Wrap sqlite connection in a way that allows concurrent requests from multiple threads.
  282.  
  283.    This is done by internally queueing the requests and processing them sequentially
  284.    in a separate thread (in the same order they arrived).
  285.  
  286.    """
  287.     def __init__(self, filename, autocommit, journal_mode):
  288.         super(SqliteMultithread, self).__init__()
  289.         self.filename = filename
  290.         self.autocommit = autocommit
  291.         self.journal_mode = journal_mode
  292.         self.reqs = Queue() # use request queue of unlimited size
  293.         self.setDaemon(True) # python2.5-compatible
  294.         self.start()
  295.  
  296.     def run(self):
  297.         if self.autocommit:
  298.             conn = sqlite3.connect(self.filename, isolation_level=None, check_same_thread=False)
  299.         else:
  300.             conn = sqlite3.connect(self.filename, check_same_thread=False)
  301.         conn.execute('PRAGMA journal_mode = %s' % self.journal_mode)
  302.         conn.text_factory = str
  303.         cursor = conn.cursor()
  304.         cursor.execute('PRAGMA synchronous=OFF')
  305.         while True:
  306.             req, arg, res = self.reqs.get()
  307.             if req == '--close--':
  308.                 break
  309.             elif req == '--commit--':
  310.                 conn.commit()
  311.             else:
  312.                 cursor.execute(req, arg)
  313.                 if res:
  314.                     for rec in cursor:
  315.                         res.put(rec)
  316.                     res.put('--no more--')
  317.                 if self.autocommit:
  318.                     conn.commit()
  319.         conn.close()
  320.  
  321.     def execute(self, req, arg=None, res=None):
  322.         """
  323.        `execute` calls are non-blocking: just queue up the request and return immediately.
  324.  
  325.        """
  326.         self.reqs.put((req, arg or tuple(), res))
  327.  
  328.     def executemany(self, req, items):
  329.         for item in items:
  330.             self.execute(req, item)
  331.  
  332.     def select(self, req, arg=None):
  333.         """
  334.        Unlike sqlite's native select, this select doesn't handle iteration efficiently.
  335.  
  336.        The result of `select` starts filling up with values as soon as the
  337.        request is dequeued, and although you can iterate over the result normally
  338.        (`for res in self.select(): ...`), the entire result will be in memory.
  339.  
  340.        """
  341.         res = Queue() # results of the select will appear as items in this queue
  342.         self.execute(req, arg, res)
  343.         while True:
  344.             rec = res.get()
  345.             if rec == '--no more--':
  346.                 break
  347.             yield rec
  348.  
  349.     def select_one(self, req, arg=None):
  350.         """Return only the first row of the SELECT, or None if there are no matching rows."""
  351.         try:
  352.             return iter(self.select(req, arg)).next()
  353.         except StopIteration:
  354.             return None
  355.  
  356.     def commit(self):
  357.         self.execute('--commit--')
  358.  
  359.     def close(self):
  360.         self.execute('--close--')
  361. #endclass SqliteMultithread
  362.  
  363.  
  364. ##quit()
  365.  
  366. # running sqlitedict.py as script will perform a simple unit test
  367. if __name__ in '__main___':
  368.     logging.basicConfig(format='%(asctime)s : %(levelname)s : %(module)s:%(lineno)d : %(funcName)s(%(threadName)s) : %(message)s')
  369.     logging.root.setLevel(level=logging.INFO)
  370.     for d in Shelve(r'c:\docs\testdb'), Shelve(r'c:\docs\testdb2', flag='n'):
  371.         assert list(d) == []
  372.         assert len(d) == 0
  373.         assert not d
  374.         d['abc'] = 'rsvp' * 100
  375.         assert d['abc'] == 'rsvp' * 100
  376.         assert len(d) == 1
  377.         d['abc'] = 'lmno'
  378.         assert d['abc'] == 'lmno'
  379.         assert len(d) == 1
  380.         del d['abc']
  381.         assert not d
  382.         assert len(d) == 0
  383.         d['abc'] = 'lmno'
  384.         d['xyz'] = 'pdq'
  385.         assert len(d) == 2
  386.         assert list(d.iteritems()) == [('abc', 'lmno'), ('xyz', 'pdq')]
  387.         assert d.items() == [('abc', 'lmno'), ('xyz', 'pdq')]
  388.         assert d.values() == ['lmno', 'pdq']
  389.         assert d.keys() == ['abc', 'xyz']
  390.         assert list(d) == ['abc', 'xyz']
  391.         d.update(p='x', q='y', r='z')
  392.         assert len(d) == 5
  393.         assert d.items() == [('abc', 'lmno'), ('xyz', 'pdq'), ('q', 'y'), ('p', 'x'), ('r', 'z')]
  394.         del d['abc']
  395.         try:
  396.             error = d['abc']
  397.         except KeyError:
  398.             pass
  399.         else:
  400.             assert False
  401.         try:
  402.             del d['abc']
  403.         except KeyError:
  404.             pass
  405.         else:
  406.             assert False
  407.         assert list(d) == ['xyz', 'q', 'p', 'r']
  408.         assert d
  409.         d.clear()
  410.         assert not d
  411.         assert list(d) == []
  412.         d.update(p='x', q='y', r='z')
  413.         assert list(d) == ['q', 'p', 'r']
  414.         d.clear()
  415.         assert not d
  416.         d.close()
  417.     print 'all tests passed :-)'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement