Advertisement
EelcoHoogendoorn

pycache

Feb 22nd, 2014
169
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.78 KB | None | 0 0
  1.  
  2. """
  3. cross platform thread and process safe caching of python objects to disk
  4.  
  5.  
  6. subclassing cache.cached allow implementation of the cached operation
  7. this could be a compilation, and a mapping of the compilation result
  8. to a format importable by python, for instance
  9.  
  10. every cache file has an environment description pickled into it.
  11. this should encapsulate all machine state relavant to the cahced process
  12.  
  13.  
  14. the intended use case is for various stages of compilation caching
  15. as such, absolute performance is not our main concern in this design
  16. if the process you intend to cache is slow relative to a pickling of the datastructure on which it acts,
  17. you probably shouldnt be caching it in the first place. but if this is a concern,
  18. pickling a string trivial; so if you feel you can do better serialization, you are welcome to
  19.  
  20.  
  21. three layered architecture:
  22.    shelve2:
  23.        efficient thread/process safe object-to-object mapping
  24.    generic caching layer:
  25.        environment caching, pycc integration, deferred tokens, NFS safety?
  26.    project specific code deriving from abstract cache object:
  27.        external compilation caching
  28.        graph optimization caching
  29.        or other arbitrarily fine-grained processes, each implementing an appropriate environment overload
  30.  
  31.  
  32. is this achitecture sufficient?
  33. the price for the simplicity of this architecture is a slight wastefulness;
  34. the source code of a function with different overloads will be stored in its entirely for each overload, for instance
  35. but until proven otherwise, i think this should be considered premature optimization
  36. trading kilobytes of harddisk space for seconds of runtime performance is the nature of the game we are playing
  37. your milage may vary, but i have a lot more kilobytes of drive space than i have seconds of patience
  38. one way to optimize this might be to use a custom serializer, where subobjects marked as being suspected of being highly repetitive
  39. get placed in a seperate table, and are subsequently refered to by their unique rowid in other tables
  40. """
  41.  
  42. import os
  43.  
  44. import tempfile
  45. from shelve2 import Shelve, process_key
  46. from time import clock, sleep, time
  47.  
  48. import threading
  49. import collections
  50.  
  51. import cPickle as Pickle
  52. import hashlib
  53.  
  54. import numpy as np
  55.  
  56.  
  57.  
  58.  
  59.  
  60.  
  61. temppath = tempfile.gettempdir()
  62. cachepath = os.path.join(temppath, 'pycache')
  63. try:
  64.     os.mkdir(cachepath)
  65. except:
  66.     pass
  67.  
  68. import datetime
  69. datetime.datetime
  70.  
  71. class Deferred(object):
  72.     """timestamped deferred token"""
  73.     def __init__(self):
  74.         self.stamp = time()
  75.     def expired(self, timeout):
  76.         dt = time() - self.stamp
  77.         return dt > timeout or dt < 0
  78.  
  79.  
  80. class Cache(object):
  81.     """
  82.    thread and process safe disk based cache,
  83.    mapping pickleable python objects to pickleable python objects
  84.    how to handle environment?
  85.    store environment as seperate pickle? or as a single table?
  86.    also this environment mechanism needs to be thread and proces safe
  87.  
  88.    should we add an access time column? or even useage statstics?
  89.    we could, but not much point; there already is a pretty effective memory use limit in play;
  90.    the disc cleanup up the temporary directory we use
  91.    """
  92.  
  93.     def __init__(self, identifier, deferred_timeout = 10, size_limit=100):
  94.         self.identifier         = identifier                                #name of database file in which to store
  95.         self.deferred_timeout   = deferred_timeout                          #timeout in seconds to wait for pending action before starting a new one
  96. ##        self.locks              = collections.defaultdict(threading.Lock)   #only one thread may modify any given entry at a time
  97.         self.lock = threading.Lock()  #single lock should do. not much reason to have multiple threads compiling
  98.         self.size_limit         = size_limit    #size limit in mb. do we really care? we work in a temp directory anyway
  99.  
  100.         estr  = Pickle.dumps(self.environment())
  101.         ehash = hashlib.md5(estr).hexdigest()
  102.         self.filename           = os.path.join(cachepath, identifier+'_'+ehash)
  103.  
  104.         def create_shelve():
  105.             """
  106.            guarantee atomicity of cache creation during multi-processing
  107.            create database as a tempfile, then rename to actual name when ready
  108.            """
  109.  
  110.             tfile = tempfile.NamedTemporaryFile(delete = False)
  111.             shelve             = Shelve(tfile.name, autocommit=True)
  112.             #add a meta blob to our shelve, to uniquely describe our environment
  113.             TABLE='CREATE TABLE IF NOT EXISTS meta (env BLOB)'
  114.             shelve.conn.execute(TABLE)
  115.             ADD_ITEM = 'INSERT INTO meta (env) VALUES (?)'
  116.             shelve.conn.execute(ADD_ITEM, (estr,))
  117.  
  118.             shelve.close()
  119.             tfile.close()
  120.             sleep(0.1)     #wait for file to really close before renaming. surely this can be done better?
  121.  
  122.             try:
  123.                 os.rename(tfile.name, self.filename)
  124.                 return Shelve(self.filename, autocommit=True)
  125.             except Exception as e:
  126.                 print e, type(e)
  127.                 #someone beat us to creation
  128.                 os.remove(tfile.name)
  129.                 raise Exception()
  130.  
  131.         def load_shelve():
  132.             if os.path.isfile(self.filename):
  133.                 shelve             = Shelve(self.filename, autocommit=True)
  134.                 try:
  135.                     if not shelve.conn.select_one('SELECT env FROM meta')[0] == estr:
  136.                         raise Exception()
  137.                 except Exception as e:
  138.                     print 'no env hit'
  139.                     print e
  140.                     shelve.close()
  141.                     #try and kill the cache with the colliding hash, but unidentical env
  142.                     #this may fail; perhaps it is in use by another process
  143.                     #anything we can do about it?
  144.                     #perhaps we should increment our hash in such a scenario
  145.                     os.remove(self.filename)
  146.                     raise Exception()
  147.                 return shelve
  148.             raise Exception()
  149.  
  150.         try:
  151.             self.shelve = load_shelve()
  152.             return
  153.         except:
  154.             pass
  155.         #cache failed to load; either it didnt exist, or it did exist but had to be removed due to a hash colision
  156.         try:
  157.             self.shelve = create_shelve()
  158.             return
  159.         except:
  160.             pass
  161.         #if creation failed as well, it may be because another process just beat us to it
  162.         #try loading once more
  163.         self.shelve = load_shelve()
  164.  
  165.  
  166.  
  167.  
  168.  
  169.     def __getitem__(self, key):
  170.         with self.lock:    #want only one thread at a time accessing this; or only one thread per key?
  171.             while True:
  172.                 try:
  173.                     value = self.shelve[key]
  174.                     if isinstance(value, Deferred):
  175.                         if value.expired(self.deferred_timeout):
  176.                             raise Exception()
  177.                         sleep(0.01)
  178.                     else:
  179.                         return value
  180.                 except:
  181.                     self.shelve[key] = Deferred()
  182.                     #active updating of the deferred object to prove activity?
  183.                     value = self.cached(key)
  184.                     self.shelve[key] = value
  185.                     return value
  186.  
  187.  
  188.  
  189.  
  190.     def __delitem__(self, key):
  191.         with self.lock:
  192.             del self.shelve[key]
  193.  
  194.  
  195.  
  196.     def cached(self, input):
  197.         """
  198.        implements the cached operation; to be invoked upon a cache miss
  199.        input is a picklable python object
  200.        the returned output should be a pickalble python object as well
  201.        """
  202.         raise NotImplementedError()
  203.     def environment(self):
  204.         """
  205.        returns a pickeable object describing the environment of the cached operation,
  206.        or a description of the state of your computer which may influence the relation
  207.        between the input and output of Cache.cached
  208.        """
  209.         raise NotImplementedError()
  210.  
  211.  
  212.  
  213. """
  214. client code starts here;
  215. from pycache import Cache
  216. """
  217.  
  218.  
  219.  
  220. import numpy as np
  221.  
  222. class CompilationCache(Cache):
  223.     """
  224.    subclass implements the actual cached operation
  225.    """
  226.     def cached(self, source):
  227.         n, s = source
  228.  
  229.         print 'compiling'
  230.         sleep(3)
  231.         q = np.array(list(s*n))
  232.         return np.sort(q).tostring()
  233.  
  234.  
  235.     def environment(self):
  236.         version='3.4'
  237.         compiler='llvm'
  238.         return version, compiler
  239.  
  240. class GraphOptimizationCache(Cache):
  241.     identifier = 'theano_graph'
  242.  
  243.     def cached(self, source):
  244.         n, s = source
  245.  
  246.         print 'compiling'
  247.         sleep(3)
  248.         q = np.array(list(s*n))
  249.         return np.sort(q).tostring()
  250.  
  251.  
  252.     def environment(self):
  253.         import numba
  254.         version = numba.__version__
  255.  
  256.         import inspect
  257.         files = [numba.Function, numba.Accessor]    #some random files....
  258.         informal_version = [inspect.getsource(file) for file in files]
  259.  
  260.         return version, informal_version, 999
  261.  
  262.  
  263. ##cache = CompilationCache('theano')
  264.  
  265. cache = GraphOptimizationCache('theano_graph')
  266.  
  267. ##quit()
  268. def worker(arg):
  269.     value = cache[arg]
  270.     return value
  271.  
  272.  
  273. if __name__=='__main__':
  274.  
  275.     #test compiling the same function many times, or compilaing different functions concurrently
  276.     args = [(3,'The quick brown fox, and so on \n aaa')]*4
  277.     #args = enumerate( ['The quick brown fox, and so on \n aaa']*4)
  278.  
  279.     #run multiple jobs concurrent as either processes or threads
  280.     threading=False
  281.     if threading:
  282.         import multiprocessing.dummy as multiprocessing
  283.     else:
  284.         import multiprocessing
  285.  
  286.     #sleep(0.1)
  287.  
  288.     pool = multiprocessing.Pool(4)
  289.     for r in pool.imap(worker,  args):
  290.         print r
  291. ##    print cache[(3,'a')]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement