Advertisement
EelcoHoogendoorn

python disc caching recipe

Feb 19th, 2014
177
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 3.50 KB | None | 0 0
  1.  
  2.  
  3. """
  4. cross platform thread and process safe caching of python objects to disk
  5.  
  6. tempfile gives cross platform storage
  7. sqlitedict gives process and thread safe transactions
  8. good old pickle allows storing arbitrary keys and values
  9.  
  10. subclassing cache.cached allow implementation of the cached operation
  11. this could be a compilation, and a mapping of the compilation result
  12. to a format importable by python, for instance
  13. """
  14.  
  15. import os
  16.  
  17. import tempfile
  18. from sqlitedict import SqliteDict
  19. import cPickle as Pickle
  20. from time import clock, sleep, time
  21.  
  22. import threading
  23. import collections
  24. locks = collections.defaultdict(threading.Lock)
  25.  
  26.  
  27. temppath = tempfile.gettempdir()
  28. cachepath = os.path.join(temppath, 'pycache')
  29. try:
  30.     os.mkdir(cachepath)
  31. except:
  32.     pass
  33.  
  34. import datetime
  35. datetime.datetime
  36.  
  37. class Deferred(object):
  38.     """timestamped deferred token"""
  39.     def __init__(self):
  40.         self.stamp = time()
  41.     def expired(self, timeout):
  42.         dt = time() - self.stamp
  43.         return dt > timeout or dt < 0
  44.  
  45.  
  46. class Cache(object):
  47.  
  48.     def __init__(self, identifier, deferred_timeout = 10):
  49.         self.identifier         = identifier                                #name of database file in which to store
  50.         self.deferred_timeout   = deferred_timeout                          #timeout in seconds to wait for pending action before starting a new one
  51.         self.filename           = os.path.join(cachepath, identifier)
  52.         self.sql                = SqliteDict(self.filename, autocommit=True)
  53.         self.locks              = collections.defaultdict(threading.Lock)
  54.  
  55.  
  56.     def __getitem__(self, key):
  57.         keystr = Pickle.dumps(key, -1)
  58.  
  59.         with self.locks[keystr]:
  60.             while True:
  61.                 try:
  62.                     value = self.sql[keystr]
  63.                     if isinstance(value, Deferred):
  64.                         if value.expired(self.deferred_timeout):
  65.                             raise Exception()
  66.                         sleep(0.01)
  67.                     else:
  68.                         return value
  69.                 except:
  70.                     self.sql[keystr] = Deferred()
  71.                     value = self.cached(key)
  72.                     self.sql[keystr] = value
  73.                     return value
  74.  
  75.     def __delitem__(self, key):
  76.         keystr = Pickle.dumps(key, -1)
  77.         with self.lock[keystr]:
  78.             del self.sql[keystr]
  79.  
  80.     def cached(self, key):
  81.         raise NotImplementedError()
  82.  
  83.  
  84.  
  85. """
  86. client code starts here;
  87. from pycache import Cache
  88. """
  89.  
  90.  
  91.  
  92. import numpy as np
  93.  
  94. class CompilationCache(Cache):
  95.     """
  96.    subclass implements the actual cached operation
  97.    """
  98.     def cached(self, source):
  99.         n, s = source
  100.  
  101.         print 'compiling'
  102.         sleep(3)
  103.         q = np.array(list(s*n))
  104.         return np.sort(q).tostring()
  105.  
  106.  
  107. cache = CompilationCache('theano_v0.6')
  108.  
  109. def worker(arg):
  110.     value = cache[arg]
  111.     return value
  112.  
  113.  
  114. if __name__=='__main__':
  115.  
  116.     #test compiling the same function many times, or compilaing different functions concurrently
  117.     args = [(3,'The quick brown fox, and so on \n aaa')]*4
  118.     args = enumerate( ['The quick brown fox, and so on \n aaa']*4)
  119.  
  120.     #run multiple jobs concurrent as either processes or threads
  121.     threading=False
  122.     if threading:
  123.         import multiprocessing.dummy as multiprocessing
  124.     else:
  125.         import multiprocessing
  126.  
  127.  
  128.     pool = multiprocessing.Pool(4)
  129.     print list(pool.map(worker, enumerate( args)))
  130. ##    print cache[(3,'a')]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement