Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class FileUtils:
- def __init__(self, companyID):
- self.companyID = companyID
- self.rootFolder = '%sClient Number/%s/' % (settings.FILE_ROOT, companyID)
- self.s3Connection = boto.connect_s3() #This opens the connection to our s3 instance
- self.s3Bucket = self.s3Connection.get_bucket(settings.S3_BUCKET) #This gets the s3 bucket
- def save_file(self, path=None, data=None):
- """
- This method saves a file to the given path. If none is given for either the path or the data it will return false.
- Parameters:
- path - The path to save the file in
- data - The file to be saved
- Returns:
- True if the file was saved successfully otherwise an error will be thrown.
- """
- if data is None or path is None:
- return False
- if not "Files/Client Number/" in path:
- path = "%s%s" % (self.rootFolder, path)
- k = self.s3Bucket.new_key(path)
- if (path[path.rfind('.'):] == '.pdf'):
- k.set_contents_from_string(data, headers={'Content-Type': 'application/pdf'})
- else:
- k.set_contents_from_string(data)
- return self.get_contents(None, path[:path.rfind('/') + 1])
- def create_folder(self, request, path=""):
- """
- This method creates a folder in the given path. If one is not given it is created in the root.
- Parameters:
- path - where the folder will be created. Empty string will create the folder in root.
- Returns:
- True if the folder was created succesfully.
- """
- if request.POST.get('name') is None:
- return False
- newFolderName = request.POST.get('name', '')
- if path == '':
- path = self.rootFolder
- newFolder = '%s%s/' % (path, newFolderName)
- try:
- k = self.s3Bucket.new_key(newFolder)
- k.set_contents_from_string("")
- except:
- return HttpResponse("Folder Name in Use", mimetype="text/plain")
- return self.get_contents(None, path)
- def delete(self, request, path=None):
- """
- This method deletes the element given in path.
- Parameters:
- path - path to the element to be deleted
- Returns:
- Contents of the parent folder
- """
- if path is None:
- return False
- #get current shares that are contained in this share and unshare them
- PendingFileSharesPermissions.objects.filter(path__path__contains=path).delete()
- PendingFileShares.objects.filter(path__contains=path).delete()
- #get current shares that are contained in this share and unshare them
- FileSharesPermissions.objects.filter(path__path__contains=path).delete()
- FileShares.objects.filter(path__contains=path).delete()
- if FileUtils.is_file(path):
- parent = path[:path.rfind('/') + 1]
- elif FileUtils.is_folder(path):
- keyList = self.s3Bucket.list(prefix=path)
- for key in keyList:
- self.s3Bucket.delete_key(key.name)
- parent = path[:path.rfind('/')]
- parent = parent[:parent.rfind('/') + 1]
- k = self.s3Bucket.get_key(path)
- if (k is not None):
- self.s3Bucket.delete_key(k)
- return self.get_contents(None, parent)
- def get_file(self, path=None, isFile=True):
- """
- This method fetches the file at path and returns it.
- Parameters:
- path - The path of the requested file
- Returns:
- The file at the requested path
- """
- if path is None:
- return None
- if isFile:
- allowedAccess = False
- if path[:self.rootFolder.__len__()] == self.rootFolder:
- allowedAccess = True
- else:
- shares = FileSharesPermissions.objects.filter(company=self.companyID)
- for share in shares:
- if share.path.path in path:
- allowedAccess = True
- break
- if not allowedAccess:
- return False
- k = self.s3Bucket.get_key(path)
- url = ''
- if k:
- url = k.generate_url(120)
- return url
- def get_raw_file(self, path=None, isFile=True):
- """
- This method fetches the file at path and returns it.
- Parameters:
- path - The path of the requested file
- Returns:
- The file at the requested path
- """
- if path is None:
- return None
- if isFile:
- allowedAccess = False
- if path[:self.rootFolder.__len__()] == self.rootFolder:
- allowedAccess = True
- else:
- shares = FileSharesPermissions.objects.filter(company=self.companyID)
- for share in shares:
- if share.path.path in path:
- allowedAccess = True
- break
- if not allowedAccess:
- return False
- k = self.s3Bucket.get_key(path)
- fileName = k.get_contents_as_string()
- return fileName
- def move(self, request, path=None):
- """
- This method renames the file by copying the key to a new key with the new name and deleting the old one.
- Functions like the Unix move command.
- Parameters:
- request - contains POST data with the new name
- path - The relative path of the item to be moved
- Returns:
- Contents of the parent folder to be used in refreshing the list
- """
- if path is None:
- return None
- #Get the path
- shared = True
- if path[:self.rootFolder.__len__()] == self.rootFolder:
- shared = False
- #Move the file or folder
- if FileUtils.is_file(path):
- #Move places the file into the new folder
- if request.POST.get('move') == 'true':
- if request.POST.get('name') != "fileslist":
- newFolder = "%s%s" % (self.rootFolder, request.POST.get('name'))
- else:
- newFolder = "%s" % (self.rootFolder)
- if not self.s3Bucket.get_key(newFolder):
- k = self.s3Bucket.new_key(newFolder)
- k.set_contents_from_string("")
- newName = "%s%s" % (newFolder, path[path.rfind('/')+1:])
- #Non-Move calls rename the file in its current folder
- else:
- if not self.s3Bucket.get_key(path[:path.rfind('/')+1]):
- k = self.s3Bucket.new_key(path[:path.rfind('/')+1])
- k.set_contents_from_string("")
- newName = "%s%s%s" % (path[:path.rfind('/')+1], request.POST.get('name'), path[path.rfind('.'):])
- self.s3Bucket.copy_key(newName, settings.S3_BUCKET, path)
- #If the file is not in a shared directory we can remove it and update file sharing if it exists
- if not shared:
- shared = FileShares.objects.filter(path=path)
- for share in shared:
- share.path = newName
- share.save()
- self.s3Bucket.delete_key(path)
- #Folder names need to be pulled out of their current path
- elif FileUtils.is_folder(path):
- #Move holds the folder name the same but appends it to the desitination folder
- if request.POST.get('move') == 'true':
- newName = path[:path.rfind('/')]
- newName = newName[newName.rfind('/')+1:]
- newName = "%s%s%s/" % (self.rootFolder, request.POST.get('name'), newName)
- #Rename changes the folder name
- else:
- newName = path[:path.rfind('/')]
- newName = newName[:newName.rfind('/')+1]
- newName = "%s%s/" % (newName, request.POST.get('name'))
- #Update any filesharing information
- shared = FileShares.objects.filter(path=path)
- for share in shared:
- share.path = newName
- share.save()
- #Alter the contents of the folder
- FileUtils._move_folder(self, path, newName)
- #Get the parent to return it's contents
- if FileUtils.is_file(path):
- parent = path[:path.rfind('/') + 1]
- elif FileUtils.is_folder(path):
- parent = path[:path.rfind('/')]
- parent = parent[:parent.rfind('/') + 1]
- return self.get_contents(None, parent)
- def move_file(self, oldPath, newPath):
- self.s3Bucket.copy_key(newPath, settings.S3_BUCKET, oldPath)
- self.s3Bucket.delete_key(oldPath)
- return True
- def _move_folder(self, path, name):
- """
- Recursive helper method to move folders and their containing files
- Parameters:
- self
- path - key name for the folder to be changed
- name - new key name to replace old path
- """
- for key in self.s3Bucket.list(prefix=path, delimiter="/"):
- if FileUtils.is_file(key.name):
- newName = "%s%s" %(name, key.name[key.name.rfind('/')+1:])
- self.s3Bucket.copy_key(newName, settings.S3_BUCKET, key.name)
- self.s3Bucket.delete_key(key.name)
- elif FileUtils.is_folder(key.name):
- newFolder = key.name[:key.name.rfind('/')]
- newFolder = "%s%s/" % (name, newFolder[newFolder.rfind('/')+1:])
- if self.s3Bucket.get_key(key.name):
- self.s3Bucket.copy_key(newFolder, settings.S3_BUCKET, key.name)
- self.s3Bucket.delete_key(key.name)
- else:
- k = self.s3Bucket.new_key(newFolder)
- k.set_contents_from_string("")
- if key.name != path:
- FileUtils._move_folder(self, key.name, newFolder)
- @staticmethod
- def is_file(path):
- """
- This method may need to become more robust in the future, but it's purpose is to detect within amazon s3 whether a given path is a file.
- Parameters:
- path - The path of the file
- Returns:
- true if the path is a file, otherwise false
- """
- foldername, filename = os.path.split(path)
- return filename != ''
- @staticmethod
- def is_folder(path):
- """
- This method may need to become more robust in the future, but it's purpose is to detect within amazon s3 whether a given path is a folder.
- Parameters:
- path - The path of the folder
- Returns:
- true if the path is a folder, otherwise false
- """
- foldername, filename = os.path.split(path)
- return filename == ''
- def get_contents(self, subFolder=None, path=None):
- """
- This method gets the file list for all files and what folder they are contained in for a given path. It drives the recursive method that gets the contents for all folders and their subfolders.
- Parameters:
- subFolder - The subfolder of the root directory to begin at
- path - Explicit path to get contents
- Returns:
- A dictionary of files and folders with their contents.
- """
- contents = self._get_contents(subFolder, path)
- if path is None or path == self.rootFolder:
- #now get the shared stuff
- sharedContents = {'folders':[], 'files':[]}
- shared = FileSharesPermissions.objects.filter(company__in=[self.companyID, 0])
- for share in shared:
- path = share.path.path
- if path[:self.rootFolder.__len__()] == self.rootFolder:
- continue
- shareContents = {'folders':[], 'files':[]}
- for key in self.s3Bucket.list(prefix=path, delimiter="/"):
- if key.name.startswith("."):
- continue
- if FileUtils.is_folder(key.name):
- if key.name == path:
- continue
- shareContents['folders'].append({"foldername":key.name.replace(path, ""), "meta":{"size":'--', 'moddate':'--', 'fullpath':key.name}, 'contents':{'folders':[], 'files':[]}})
- if FileUtils.is_file(key.name):
- s = key.size
- d = datetime.datetime.strptime(key.last_modified[:19], "%Y-%m-%dT%H:%M:%S")
- if key.name == path:
- folderName, keyName = os.path.split(path)
- else:
- keyName = key.name.replace(path, "")
- shareContents['files'].append({"filename":keyName, "meta":{"size":FileUtils.convert_bytes(s), "moddate":d.strftime("%m/%d/%y %H:%M:%S"), 'fullpath':key.name}})
- if FileUtils.is_folder(path):
- sharedContents['folders'].append({'foldername':("%s/" % os.path.basename(os.path.normpath(path))), 'meta':{'size':'--', 'moddate':'--', 'fullpath':path}, 'contents':shareContents})
- else:
- sharedContents['files'].extend(shareContents['files'])
- contents['folders'].append({"foldername":"SHARED/", "meta":{"size":"--", 'moddate':'--'}, 'contents':sharedContents})
- return contents
- def _get_contents(self, subFolder=None, path=None):
- """
- This method gets the file list for all files and what folder they are contained in for a given path. It recursively gets the contents for all folders and their subfolders.
- Parameters:
- subFolder - The subfolder of the root directory to begin at
- path - Explicit path to get contents
- Returns:
- A dictionary of files and folders with their contents.
- """
- if subFolder is None and path is None:
- path = self.rootFolder
- elif subFolder is not None and path is None:
- path = "%s%s/" % (self.rootFolder, subFolder)
- elif subFolder is not None and path is not None:
- path = "%s%s/" % (path, subFolder)
- contents = {'folders':[], 'files':[]}
- for key in self.s3Bucket.list(prefix=path, delimiter="/"):
- if key.name.startswith("."):
- continue
- if FileUtils.is_folder(key.name):
- #Specification files will be handled by the user through the Specification Files page
- if key.name == path or key.name == self.rootFolder + 'SPECIFICATION FILES/':
- continue
- contents['folders'].append({"foldername":key.name.replace(path, ""), "meta":{"size":'--', 'moddate':'--', 'fullpath':key.name}, 'contents':{'folders':[], 'files':[]}})
- if FileUtils.is_file(key.name):
- s = key.size
- d = datetime.datetime.strptime(key.last_modified[:19], "%Y-%m-%dT%H:%M:%S")
- contents['files'].append({"filename":key.name.replace(path, ""), "meta":{"size":FileUtils.convert_bytes(s), "moddate":d.strftime("%m/%d/%y %H:%M:%S"), 'fullpath':key.name}})
- return contents
- @staticmethod
- def convert_bytes(bytes):
- """
- This is a static helper method to convert bytes into a readable format
- Parameters: bytes - Bytes to be formated
- Returns: A string representation of the bytes in a more readable format
- """
- bytes = float(bytes)
- if bytes >= 1099511627776:
- terabytes = bytes / 1099511627776
- size = '%.2fTB' % terabytes
- elif bytes >= 1073741824:
- gigabytes = bytes / 1073741824
- size = '%.2fGB' % gigabytes
- elif bytes >= 1048576:
- megabytes = bytes / 1048576
- size = '%.2fMB' % megabytes
- elif bytes >= 1024:
- kilobytes = bytes / 1024
- size = '%.2fKB' % kilobytes
- else:
- size = '%.2fB' % bytes
- return size
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement