Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- cross platform thread and process safe caching of python objects to disk
- subclassing cache.cached allow implementation of the cached operation
- this could be a compilation, and a mapping of the compilation result
- to a format importable by python, for instance
- every cache file has an environment description pickled into it.
- this should encapsulate all machine state relavant to the cahced process
- the intended use case is for various stages of compilation caching
- as such, absolute performance is not our main concern in this design
- if the process you intend to cache is slow relative to a pickling of the datastructure on which it acts,
- you probably shouldnt be caching it in the first place. but if this is a concern,
- pickling a string trivial; so if you feel you can do better serialization, you are welcome to
- three layered architecture:
- shelve2:
- efficient thread/process safe object-to-object mapping
- generic caching layer:
- environment caching, pycc integration, deferred tokens, NFS safety?
- project specific code deriving from abstract cache object:
- external compilation caching
- graph optimization caching
- or other arbitrarily fine-grained processes, each implementing an appropriate environment overload
- is this achitecture sufficient?
- the price for the simplicity of this architecture is a slight wastefulness;
- the source code of a function with different overloads will be stored in its entirely for each overload, for instance
- but until proven otherwise, i think this should be considered premature optimization
- trading kilobytes of harddisk space for seconds of runtime performance is the nature of the game we are playing
- your milage may vary, but i have a lot more kilobytes of drive space than i have seconds of patience
- one way to optimize this might be to use a custom serializer, where subobjects marked as being suspected of being highly repetitive
- get placed in a seperate table, and are subsequently refered to by their unique rowid in other tables
- """
- import os
- import tempfile
- from shelve2 import Shelve, process_key
- from time import clock, sleep, time
- import threading
- import collections
- import cPickle as Pickle
- import hashlib
- import numpy as np
- temppath = tempfile.gettempdir()
- cachepath = os.path.join(temppath, 'pycache')
- try:
- os.mkdir(cachepath)
- except:
- pass
- import datetime
- datetime.datetime
- class Deferred(object):
- """timestamped deferred token"""
- def __init__(self):
- self.stamp = time()
- def expired(self, timeout):
- dt = time() - self.stamp
- return dt > timeout or dt < 0
- class Cache(object):
- """
- thread and process safe disk based cache,
- mapping pickleable python objects to pickleable python objects
- how to handle environment?
- store environment as seperate pickle? or as a single table?
- also this environment mechanism needs to be thread and proces safe
- should we add an access time column? or even useage statstics?
- we could, but not much point; there already is a pretty effective memory use limit in play;
- the disc cleanup up the temporary directory we use
- """
- def __init__(self, identifier, deferred_timeout = 10, size_limit=100):
- self.identifier = identifier #name of database file in which to store
- self.deferred_timeout = deferred_timeout #timeout in seconds to wait for pending action before starting a new one
- ## self.locks = collections.defaultdict(threading.Lock) #only one thread may modify any given entry at a time
- self.lock = threading.Lock() #single lock should do. not much reason to have multiple threads compiling
- self.size_limit = size_limit #size limit in mb. do we really care? we work in a temp directory anyway
- estr = Pickle.dumps(self.environment())
- ehash = hashlib.md5(estr).hexdigest()
- self.filename = os.path.join(cachepath, identifier+'_'+ehash)
- def create_shelve():
- """
- guarantee atomicity of cache creation during multi-processing
- create database as a tempfile, then rename to actual name when ready
- """
- tfile = tempfile.NamedTemporaryFile(delete = False)
- shelve = Shelve(tfile.name, autocommit=True)
- #add a meta blob to our shelve, to uniquely describe our environment
- TABLE='CREATE TABLE IF NOT EXISTS meta (env BLOB)'
- shelve.conn.execute(TABLE)
- ADD_ITEM = 'INSERT INTO meta (env) VALUES (?)'
- shelve.conn.execute(ADD_ITEM, (estr,))
- shelve.close()
- tfile.close()
- sleep(0.1) #wait for file to really close before renaming. surely this can be done better?
- try:
- os.rename(tfile.name, self.filename)
- return Shelve(self.filename, autocommit=True)
- except Exception as e:
- print e, type(e)
- #someone beat us to creation
- os.remove(tfile.name)
- raise Exception()
- def load_shelve():
- if os.path.isfile(self.filename):
- shelve = Shelve(self.filename, autocommit=True)
- try:
- if not shelve.conn.select_one('SELECT env FROM meta')[0] == estr:
- raise Exception()
- except Exception as e:
- print 'no env hit'
- print e
- shelve.close()
- #try and kill the cache with the colliding hash, but unidentical env
- #this may fail; perhaps it is in use by another process
- #anything we can do about it?
- #perhaps we should increment our hash in such a scenario
- os.remove(self.filename)
- raise Exception()
- return shelve
- raise Exception()
- try:
- self.shelve = load_shelve()
- return
- except:
- pass
- #cache failed to load; either it didnt exist, or it did exist but had to be removed due to a hash colision
- try:
- self.shelve = create_shelve()
- return
- except:
- pass
- #if creation failed as well, it may be because another process just beat us to it
- #try loading once more
- self.shelve = load_shelve()
- def __getitem__(self, key):
- with self.lock: #want only one thread at a time accessing this; or only one thread per key?
- while True:
- try:
- value = self.shelve[key]
- if isinstance(value, Deferred):
- if value.expired(self.deferred_timeout):
- raise Exception()
- sleep(0.01)
- else:
- return value
- except:
- self.shelve[key] = Deferred()
- #active updating of the deferred object to prove activity?
- value = self.cached(key)
- self.shelve[key] = value
- return value
- def __delitem__(self, key):
- with self.lock:
- del self.shelve[key]
- def cached(self, input):
- """
- implements the cached operation; to be invoked upon a cache miss
- input is a picklable python object
- the returned output should be a pickalble python object as well
- """
- raise NotImplementedError()
- def environment(self):
- """
- returns a pickeable object describing the environment of the cached operation,
- or a description of the state of your computer which may influence the relation
- between the input and output of Cache.cached
- """
- raise NotImplementedError()
- """
- client code starts here;
- from pycache import Cache
- """
- import numpy as np
- class CompilationCache(Cache):
- """
- subclass implements the actual cached operation
- """
- def cached(self, source):
- n, s = source
- print 'compiling'
- sleep(3)
- q = np.array(list(s*n))
- return np.sort(q).tostring()
- def environment(self):
- version='3.4'
- compiler='llvm'
- return version, compiler
- class GraphOptimizationCache(Cache):
- identifier = 'theano_graph'
- def cached(self, source):
- n, s = source
- print 'compiling'
- sleep(3)
- q = np.array(list(s*n))
- return np.sort(q).tostring()
- def environment(self):
- import numba
- version = numba.__version__
- import inspect
- files = [numba.Function, numba.Accessor] #some random files....
- informal_version = [inspect.getsource(file) for file in files]
- return version, informal_version, 999
- ##cache = CompilationCache('theano')
- cache = GraphOptimizationCache('theano_graph')
- ##quit()
- def worker(arg):
- value = cache[arg]
- return value
- if __name__=='__main__':
- #test compiling the same function many times, or compilaing different functions concurrently
- args = [(3,'The quick brown fox, and so on \n aaa')]*4
- #args = enumerate( ['The quick brown fox, and so on \n aaa']*4)
- #run multiple jobs concurrent as either processes or threads
- threading=False
- if threading:
- import multiprocessing.dummy as multiprocessing
- else:
- import multiprocessing
- #sleep(0.1)
- pool = multiprocessing.Pool(4)
- for r in pool.imap(worker, args):
- print r
- ## print cache[(3,'a')]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement