Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- cross platform thread and process safe caching of python objects to disk
- tempfile gives cross platform storage
- sqlitedict gives process and thread safe transactions
- good old pickle allows storing arbitrary keys and values
- subclassing cache.cached allow implementation of the cached operation
- this could be a compilation, and a mapping of the compilation result
- to a format importable by python, for instance
- """
- import os
- import tempfile
- from sqlitedict import SqliteDict
- import cPickle as Pickle
- from time import clock, sleep, time
- import threading
- import collections
- locks = collections.defaultdict(threading.Lock)
- temppath = tempfile.gettempdir()
- cachepath = os.path.join(temppath, 'pycache')
- try:
- os.mkdir(cachepath)
- except:
- pass
- import datetime
- datetime.datetime
- class Deferred(object):
- """timestamped deferred token"""
- def __init__(self):
- self.stamp = time()
- def expired(self, timeout):
- dt = time() - self.stamp
- return dt > timeout or dt < 0
- class Cache(object):
- def __init__(self, identifier, deferred_timeout = 10):
- self.identifier = identifier #name of database file in which to store
- self.deferred_timeout = deferred_timeout #timeout in seconds to wait for pending action before starting a new one
- self.filename = os.path.join(cachepath, identifier)
- self.sql = SqliteDict(self.filename, autocommit=True)
- self.locks = collections.defaultdict(threading.Lock)
- def __getitem__(self, key):
- keystr = Pickle.dumps(key, -1)
- with self.locks[keystr]:
- while True:
- try:
- value = self.sql[keystr]
- if isinstance(value, Deferred):
- if value.expired(self.deferred_timeout):
- raise Exception()
- sleep(0.01)
- else:
- return value
- except:
- self.sql[keystr] = Deferred()
- value = self.cached(key)
- self.sql[keystr] = value
- return value
- def __delitem__(self, key):
- keystr = Pickle.dumps(key, -1)
- with self.lock[keystr]:
- del self.sql[keystr]
- def cached(self, key):
- raise NotImplementedError()
- """
- client code starts here;
- from pycache import Cache
- """
- import numpy as np
- class CompilationCache(Cache):
- """
- subclass implements the actual cached operation
- """
- def cached(self, source):
- n, s = source
- print 'compiling'
- sleep(3)
- q = np.array(list(s*n))
- return np.sort(q).tostring()
- cache = CompilationCache('theano_v0.6')
- def worker(arg):
- value = cache[arg]
- return value
- if __name__=='__main__':
- #test compiling the same function many times, or compilaing different functions concurrently
- args = [(3,'The quick brown fox, and so on \n aaa')]*4
- args = enumerate( ['The quick brown fox, and so on \n aaa']*4)
- #run multiple jobs concurrent as either processes or threads
- threading=False
- if threading:
- import multiprocessing.dummy as multiprocessing
- else:
- import multiprocessing
- pool = multiprocessing.Pool(4)
- print list(pool.map(worker, enumerate( args)))
- ## print cache[(3,'a')]
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement