Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import random
- import string
- import datetime
- import pymongo
- ''' UTILS '''
- def GetNewId():
- return "".join(random.choice(string.ascii_lowercase + string.digits) for i in range(30))
- ''' CONTENT DUMMIES '''
- class Content:
- def __init__(self, id=None):
- self.id = id
- self.size = 239
- def getSize(self):
- return self.size
- def GetContentById(id):
- return Content(id=id)
- def AddNewContent():
- return Content(id=GetNewId())
- def GetNormPath(path):
- if type(path) is str:
- path = list(filter(lambda s: s, path.split("/")))
- return path
- ''' DB '''
- class DBFindErr(Exception):
- def __init__(self):
- super().__init__(self, "Database find error")
- class DB:
- ''' Singleton '''
- db = None
- def __new__(cls):
- if not cls.db:
- cls.db = super().__new__(cls)
- cls.db.db = None
- return cls.db
- def __init__(self):
- if not self.db:
- client = pymongo.MongoClient("mongodb://37.200.65.235:27017/")
- self.db = client.disk
- def getDB(self):
- return self.db
- def findOnce(self, tableName, filt):
- f = self.getDB()[tableName].find(filt)
- if f.count() != 1:
- raise DBFindErr
- #return list(f)[0]
- return f[0]
- ''' META '''
- class MetaIsInvalidErr(Exception):
- def __init__(self):
- super().__init__("Meta is invalid")
- class MetaBase:
- def __init__(self, fileType=None, name=None, parentId=None, crTime=None, id=None):
- if id is None:
- self.id = GetNewId()
- self.name = name
- self.fileType = fileType
- self.parentId = parentId
- self.crTime = crTime
- self.data = {}
- else:
- self.id = id
- def getId(self):
- return self.id
- def load(self):
- try:
- self.data = DB().findOnce("meta", {"id": self.id})
- except DBFindErr:
- raise MetaIsInvalidErr
- self.name = self.data.get("name", None)
- self.fileType = self.data.get("fileType", None)
- self.parentId = self.data.get("parentId", None)
- self.crTime = self.data.get("crTime", datetime.datetime.now())
- return self
- def save(self):
- self.data["id"] = self.id
- self.data["name"] = self.name
- self.data["fileType"] = self.fileType
- self.data["parentId"] = self.parentId
- self.data["crTime"] = self.crTime
- DB().getDB().meta.update({"id": self.id}, self.data, upsert=True)
- return self
- def getInfo(self):
- return {
- "name": self.name,
- "created time": self.crTime,
- }
- class RegularFile(MetaBase):
- def __init__(self, contentId=None, *args, **kwargs):
- super().__init__("regular", *args, **kwargs)
- if contentId is None:
- contentId = AddNewContent().id
- self.contentId = contentId
- def load(self):
- super().load()
- self.contentId = self.data["contentId"]
- return self
- def save(self):
- self.data["contentId"] = self.contentId
- super().save()
- return self
- def getInfo(self):
- info = super().getInfo()
- info["type"] = "Regular file"
- info["size"] = "{}B".format(GetContentById(self.contentId).getSize())
- return info
- class Directory(MetaBase):
- def __init__(self, *args, **kwargs):
- super().__init__("directory", *args, **kwargs)
- def getFiles(self):
- types = {
- "directory": Directory,
- "regular": RegularFile,
- }
- files = {data["name"]: types[data["fileType"]](id=data["id"]).load() for data in \
- DB().getDB().meta.find({"parentId": self.id})}
- return files
- def getInfo(self):
- info = super().getInfo()
- info["type"] = "Directory"
- return info
- ''' DISK '''
- class FileDoesNotExistErr(Exception):
- def __init__(self):
- super().__init__("File doesn't exist")
- class FileIsAlreadyExistErr(Exception):
- def __init__(self):
- super().__init__("File is already exist")
- class WrongPathErr(Exception):
- def __init__(self):
- super().__init__("Wrong path")
- class ItIsNotDirectoryPathErr(Exception):
- def __init__(self):
- super().__init__("It's not directory path")
- class Disk:
- def __init__(self, name, root):
- self.root = root
- self.name = name
- def getFileByPath(self, path):
- path = GetNormPath(path)
- cur = self.root
- for d in path:
- if type(cur) is not Directory:
- raise WrongPathErr
- files = cur.getFiles()
- if d not in files:
- raise FileDoesNotExistErr
- cur = files[d]
- return cur
- def fileExists(self, path):
- try:
- self.getFileByPath(path)
- except FileDoesNotExistErr:
- return False
- return True
- def addFile(self, path):
- path = GetNormPath(path)
- if self.fileExists(path):
- raise FileIsAlreadyExistErr
- try:
- d = self.getFileByPath(path[:-1])
- except (FileDoesNotExistErr, WrongPathErr):
- raise WrongPathErr
- RegularFile(parentId=d.getId(), name=path[-1],
- crTime=datetime.datetime.now()).save()
- def addDirectory(self, path):
- path = GetNormPath(path)
- if self.fileExists(path):
- raise FileIsAlreadyExistErr
- try:
- d = self.getFileByPath(path[:-1])
- except (FileDoesNotExistErr, WrongPathErr):
- raise WrongPathErr
- Directory(parentId=d.getId(), name=path[-1],
- crTime=datetime.datetime.now()).save()
- def getDirectoryFiles(self, path):
- d = self.getFileByPath(path)
- if type(d) is not Directory:
- raise ItIsNotDirectoryPathErr
- return sorted(d.getFiles().values(), key=lambda f: f.name)
- def getFileInfo(self, path):
- return self.getFileByPath(path).getInfo()
- class DiskNameIsBusyErr(Exception):
- def __init__(self):
- super().__init__("Disk name is busy")
- class DiskDoesNotExist(Exception):
- def __init__(self):
- super().__init__("Disk doesn't exist")
- def CreateDisk(name):
- db = DB().getDB()
- if db.disks.find({"name": name}).count():
- raise DiskNameIsBusyErr
- root = Directory()
- root.save()
- disk = Disk(name, root)
- db.disks.insert({
- "name": name,
- "root": root.getId(),
- })
- return disk
- def GetDisk(name):
- try:
- diskData = DB().findOnce("disks", {"name": name})
- except DBFindErr:
- raise DiskDoesNotExistErr
- root = Directory(id=diskData["root"])
- root.load()
- return Disk(name, root)
- ''' SHELL '''
- def ShellMode():
- diskName = "testDisk"
- disk = GetDisk(diskName)
- print(
- "Shell mode\n"
- "root is \"/\"\n"
- "using disk \"{}\"\n".format(disk.name) + \
- "Use commands:\n"
- "\t add_file <path>\n"
- "\t add_dir <path>\n"
- "\t ls <path>\n"
- "\t info <path>\n"
- "\t exit [or quit]"
- )
- while True:
- command = input("> ").split()
- try:
- if not command:
- continue
- if command[0] in ["exit", "quit"]:
- break
- elif command[0] == "ls":
- path = command[1]
- files = disk.getDirectoryFiles(path)
- for f in files:
- print("{}{}".format(f.name, "/" if f.fileType == "directory" else ""))
- elif command[0] == "info":
- path = command[1]
- info = disk.getFileInfo(path)
- print("file {} info:".format(path))
- for attr, val in info.items():
- print("\t{}: {}".format(attr, val))
- elif command[0] == "add_file":
- path = command[1]
- disk.addFile(path)
- elif command[0] == "add_dir":
- path = command[1]
- disk.addDirectory(path)
- except Exception as e:
- print("Error: {}".format(e))
- if __name__ == "__main__":
- ShellMode()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement