Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # ===========================================================#
- # Project Management System. #
- # Written by Tyler Beaver, 2012. #
- # Feel free to use, but don't remove my name please? :) #
- # ===========================================================#
- import os,json
- from time import time # For unix time-stamp. Potential problem for threading?
- class Cache:
- objects = {}#Share objects!
- def __init__(self,load): self.load = load
- @staticmethod
- def needsUpdate(fn):
- if fn not in Cache.objects:
- return True
- if os.path.getmtime(fn) > Cache.objects[fn][1]:
- return True
- return False
- def load(self,fn):
- if Cache.needsUpdate(fn):
- o,m = self.load(fn),os.path.getmtime(fn)
- Cache.objects[fn] = (o,m)
- return o
- return Cache.objects[fn]
- class DataStore:
- def __init__(self,entity,*foreign,loadObject=None,saveObject=None):
- self.entity = entity
- self.foreignKeys,self.foreignLists = {},{}
- typeMap = {'list':self.foreignLists,'key':self.foreignKeys}
- for ref in foreign:
- a=typeMap.get(ref[0],None)
- if a is None:
- raise AssertionError("What kind of foreign object is this?")
- a[ref[1]] = ref[2]
- self.cache = Cache(self.loadObject)
- if not os.path.exists(self.directory):
- os.mkdir(self.directory)
- @staticmethod
- def baseDirectory():return os.environ.get("FMDataStore",os.getcwd())
- @property
- def directory(self):
- d = str(self.entity.__name__)
- return os.path.join(DataStore.baseDirectory(),d)
- def create(self,**opts):return self.entity(self,**opts)
- def store(self,entity):
- assert isinstance(entity,self.entity)
- fn = os.path.join(self.directory,entity.key+".json")
- self.saveObject(entity.dat,fn)
- def loadObject(self,fn):
- return self.entity(self,key=fn[:fn.rfind(".")],**json.load(open(fn)))
- def load(self,key,foreign=None):
- if foreign:
- print(foreign,key)
- return self.foreignKeys[foreign].load(key)
- data = self.cache.load(os.path.join(self.directory,key))
- return self.entity(self,key=key,**data)
- def __iter__(self):
- def fetchAll():
- for fn in os.listdir(self.directory):
- yield self.cache.load(os.path.join(self.directory,fn))
- return fetchAll()
- class Entity:
- def __init__(self,key=int(time()),**data):
- self.modified = False
- self.key = str(key)
- self.dat = self.template
- for key in data.keys():
- self.dat[key] = data[key]
- def __getattr__(self,attr):
- if attr in self.dat:
- v=self.dat[attr]
- if attr in self.storage.foreignKeys:
- if attr is not None:
- return self.storage.load(v,foreign=attr)
- return v
- t="{} entity has no attribute '{}'"
- raise AttributeError(t.format(type(self).__name__, attr))
- def update(self,**kwargs):
- for key in kwargs.keys():
- self.modified = True
- self.dat[key] = kwargs[key]
- class Query:
- def __init__(self,db,*filters):
- self.source=db
- self.filters = list(filters)
- def __iter__(self):
- def filter():
- for entity in self.source:
- if all(filter(entity) for filter in self.filters):
- yield entity
- return filter()
- if __name__ == "__main__":
- class Person(Entity):
- def template(self):
- return {
- 'name':"",
- 'age':0,
- 'location':None
- }
- people = DataStore(Person)
- if "yes" in input("Create a new person? >"):
- name = input("Name> ")
- age = input("Age> ")
- location = input("Location [ or 'no']> ")
- location = location if location.lower()!='no' else None
- people.create(name=name,age=age,location=location).save()
- for person in people:
- print(person)
- name = person.name
- age = person.age
- location = person.location
- intro = "This is {}.".format(name)
- b = "He is {}.".format(age) if age else "We don't know how old he is."
- c = "He lives in {}.".format(location) if location else "He would rather you didn't know where he lives."
- print(intro,b,c)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement