Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/fossdriver/config.py b/fossdriver/config.py
- index a1980a0..7778403 100644
- --- a/fossdriver/config.py
- +++ b/fossdriver/config.py
- @@ -4,6 +4,7 @@
- import json
- import logging
- +
- class FossConfig(object):
- def __init__(self):
- @@ -24,17 +25,17 @@ class FossConfig(object):
- # check whether we got everything we expected
- isValid = True
- if self.serverUrl == "":
- - logging.error(f"serverUrl not found in config file")
- + logging.error("serverUrl not found in config file")
- isValid = False
- if self.username == "":
- - logging.error(f"username not found in config file")
- + logging.error("username not found in config file")
- isValid = False
- if self.password == "":
- - logging.error(f"password not found in config file")
- + logging.error("password not found in config file")
- isValid = False
- return isValid
- - except json.decoder.JSONDecodeError as e:
- - logging.error(f"Error loading or parsing {configFilename}: {str(e)}")
- + except ValueError as e:
- + logging.error("Error loading or parsing {}: {}".format(configFilename, str(e)))
- return False
- diff --git a/fossdriver/parser.py b/fossdriver/parser.py
- index 9bbbbd5..73f8571 100644
- --- a/fossdriver/parser.py
- +++ b/fossdriver/parser.py
- @@ -20,7 +20,7 @@ class ParsedLicense(object):
- self._id = -1
- def __repr__(self):
- - return f"ParsedLicense: {self.name} ({self._id})"
- + return "ParsedLicense: {} ({})".format(selfname, self._id)
- class ParsedJob(object):
- def __init__(self):
- @@ -30,7 +30,7 @@ class ParsedJob(object):
- self.reportId = -1
- def __repr__(self):
- - return f"Job {self._id}: {self.agent}, {self.status}"
- + return "Job {}: {}, {}".format(self._id, self.agent, self.status)
- def parseUploadDataForFolderLineItem(lineItem):
- """
- @@ -109,7 +109,7 @@ def parseUploadFormBuildToken(content):
- try:
- return soup.find("input", {"name": "uploadformbuild"}).get("value", None)
- except Exception as e:
- - logging.warn(f"Couldn't extract uploadformbuild token: {str(e)}")
- + logging.warn("Couldn't extract uploadformbuild token: {}".format(str(e)))
- return None
- def parseFolderNumber(content, folderName):
- @@ -199,7 +199,7 @@ def parseSingleJobData(content):
- soup = bs4.BeautifulSoup(jobIdString, "lxml")
- aLink = soup.a
- if aLink.contents != []:
- - job._id = int(aLink.contents[0])
- + job._id = int(aLink.contents[0])
- # third row: agent name
- jobAgentRow = jobArr[3]
- job.agent = jobAgentRow.get("1", "")
- diff --git a/fossdriver/server.py b/fossdriver/server.py
- index fc7a93b..556be82 100644
- --- a/fossdriver/server.py
- +++ b/fossdriver/server.py
- @@ -9,6 +9,7 @@ import requests
- from requests_toolbelt.multipart.encoder import MultipartEncoder
- import time
- import urllib
- +import io
- import fossdriver.parser
- @@ -20,7 +21,8 @@ class BulkTextMatchAction(object):
- self.action = ""
- def __repr__(self):
- - return f"BulkTextMatchAction: [{self.action}] {self.licenseName} ({self.licenseId})"
- + return "BulkTextMatchAction: [{}], [{}], [{}]".format(self.action, self.licenseName, self.licenseId)
- +
- class FossServer(object):
- @@ -102,7 +104,7 @@ class FossServer(object):
- # FIXME note that using browse-processPost means we may only get
- # FIXME the first 100 uploads in the folder. may be able to check
- # FIXME iTotalDisplayRecords and loop to get more if needed
- - endpoint = f"/repo/?mod=browse-processPost&folder={folderNum}&iDisplayStart=0&iDisplayLength=100"
- + endpoint = "/repo/?mod=browse-processPost&folder={}&iDisplayStart=0&iDisplayLength=100".format(folderNum)
- results = self._get(endpoint)
- rj = json.loads(results.content)
- uploadData = rj.get("aaData", None)
- @@ -167,7 +169,8 @@ class FossServer(object):
- # determine mime type
- mime = MimeTypes()
- - murl = urllib.request.pathname2url(filePath)
- +# murl = urllib.request.pathname2url(filePath)
- + murl = urllib.pathname2url(filePath)
- mime_type = mime.guess_type(murl)
- # retrieve custom token for upload
- @@ -200,7 +203,7 @@ class FossServer(object):
- - uploadNum: valid ID number for an existing upload.
- - topTreeItemNum: valid ID number for an item in that upload.
- """
- - endpoint = f"/repo/?mod=view-license&upload={uploadNum}&item={itemNum}"
- + endpoint = "/repo/?mod=view-license&upload={}&item={}".format(uploadNum, itemNum)
- results = self._get(endpoint)
- licenses = fossdriver.parser.parseAllLicenseData(results.content)
- return licenses
- @@ -254,7 +257,7 @@ class FossServer(object):
- def _getJobSingleData(self, jobNum):
- """Helper function: Retrieve job data for a single job."""
- - endpoint = f"/repo/?mod=ajaxShowJobs&do=showSingleJob&jobId={jobNum}"
- + endpoint = "/repo/?mod=ajaxShowJobs&do=showSingleJob&jobId={}".format(jobNum)
- results = self._get(endpoint)
- job = fossdriver.parser.parseSingleJobData(results.content)
- return job
- @@ -281,7 +284,7 @@ class FossServer(object):
- values = {
- "agents[]": "agent_reuser",
- "upload": str(uploadNum),
- - "uploadToReuse": f"{reusedUploadNum},3",
- + "uploadToReuse": "{}, 3".format(reusedUploadNum)
- }
- self._post(endpoint, values)
- @@ -317,7 +320,7 @@ class FossServer(object):
- Arguments:
- - uploadNum: ID number of upload to export as tag-value.
- """
- - endpoint = f"/repo/?mod=ui_spdx2&outputFormat=spdx2tv&upload={uploadNum}"
- + endpoint = "/repo/?mod=ui_spdx2&outputFormat=spdx2tv&upload={}".format(uploadNum)
- self._get(endpoint)
- def GetSPDXTVReport(self, uploadNum, outFilePath):
- @@ -336,9 +339,9 @@ class FossServer(object):
- return False
- # now, go get the actual report
- - endpoint = f"/repo/?mod=download&report={job.reportId}"
- + endpoint = "/repo/?mod=download&report={}".format(job.reportId)
- results = self._get(endpoint)
- - with open(outFilePath, "w") as f:
- + with io.open(outFilePath, "w", encoding="utf-8") as f:
- f.write(results.content.decode("utf-8"))
- return True
- @@ -359,7 +362,7 @@ class FossServer(object):
- - itemNum: ID number for tree item within upload (NOT the upload number).
- - actions: list of BulkTextMatchActions to perform.
- """
- - endpoint = f"/repo/?mod=change-license-bulk"
- + endpoint = "/repo/?mod=change-license-bulk"
- # start building values
- values = {
- "refText": refText,
- @@ -371,10 +374,10 @@ class FossServer(object):
- row = 0
- for action in actions:
- # FIXME should this validate that the requested actions / lics are valid?
- - rowPrefix = f"bulkAction[{row}]"
- - values[f"{rowPrefix}[licenseId]"] = str(action.licenseId)
- - values[f"{rowPrefix}[licenseName]"] = action.licenseName
- - values[f"{rowPrefix}[action]"] = action.action
- + rowPrefix = "bulkAction[{}]".format(row)
- + values["{}[licenseId]".format(rowPrefix)] = str(action.licenseId)
- + values["{}[licenseName]".format(rowPrefix)] = action.licenseName
- + values["{}[action]".format(rowPrefix)] = action.action
- row += 1
- self._post(endpoint, values)
- diff --git a/fossdriver/tasks.py b/fossdriver/tasks.py
- index 5263a1a..c2f7f83 100644
- --- a/fossdriver/tasks.py
- +++ b/fossdriver/tasks.py
- @@ -9,7 +9,7 @@ class Task(object):
- self._type = _type
- def __repr__(self):
- - return f"Task: {self._type}"
- + return "Task: {}".format(self._type)
- def run(self):
- """Run the specified Task and return success or failure."""
- @@ -22,20 +22,20 @@ class CreateFolder(Task):
- self.parentFolderName = parentFolderName
- def __repr__(self):
- - return f"Task: {self._type} (new folder {self.newFolderName}, parent folder {self.parentFolderName})"
- + return "Task: {} (new folder {}, parent folder {})".format(self._type, self.newFolderName, self.parentFolderName)
- def run(self):
- - logging.info(f"Running task: {self._type}")
- + logging.info("Running task: {}".format(self._type))
- """Create the folder and return success or failure."""
- # FIXME check whether the folder already exists
- # first, get the parent folder ID
- parentFolderNum = self.server.GetFolderNum(self.parentFolderName)
- if parentFolderNum is None or parentFolderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for parent folder {self.parentFolderName}")
- + logging.error("Failed: could not retrieve folder number for parent folder {}".format(self.parentFolderName))
- return False
- # now, create the folder
- - logging.info(f"Creating folder {self.newFolderName} in parent folder {self.parentFolderName} ({parentFolderNum})")
- + logging.info("Creating folder {} in parent folder {} ({})".format(self.newFolderName, self.parentFolderName, parentFolderNum))
- self.server.CreateFolder(parentFolderNum, self.newFolderName, self.newFolderName)
- # FIXME check to see whether it was successfully created?
- @@ -49,27 +49,27 @@ class Upload(Task):
- self.folderName = folderName
- def __repr__(self):
- - return f"Task: {self._type} (filePath {self.filePath}, folder {self.folderName})"
- + return "Task: {} (filePath {}, folder {})".format(self._type, self.filePath, self.folderName)
- def run(self):
- - logging.info(f"Running task: {self._type}")
- + logging.info("Running task: {}".format(self._type))
- """Start the upload, wait until it completes and return success or failure."""
- # first, get the destination folder ID
- folderNum = self.server.GetFolderNum(self.folderName)
- if folderNum is None or folderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for folder {self.folderName}")
- + logging.error("Failed: could not retrieve folder number for folder {}".format(self.folderName))
- return False
- # now, start the upload
- - logging.info(f"Uploading {self.filePath} to folder {self.folderName} ({folderNum})")
- + logging.info("Uploading {} to folder {} ({})".format(self.filePath, self.folderName, folderNum))
- newUploadNum = self.server.UploadFile(self.filePath, folderNum)
- if newUploadNum is None or newUploadNum < 1:
- - logging.error(f"Failed: could not receive ID number for upload {self.filePath}")
- + logging.error("Failed: could not receive ID number for upload {}".format(self.filePath))
- return False
- - logging.info(f"Upload complete, {self.filePath} upload ID number is {newUploadNum}")
- + logging.info("Upload complete, {} upload ID number is {}".format(self.filePath, newUploadNum))
- # and wait until upload finishes unpacking
- - logging.info(f"Waiting for upload {newUploadNum} to unpack")
- + logging.info("Waiting for upload {} to unpack".format(newUploadNum))
- self.server.WaitUntilAgentIsDone(newUploadNum, "ununpack", pollSeconds=5)
- self.server.WaitUntilAgentIsDone(newUploadNum, "adj2nest", pollSeconds=5)
- @@ -82,29 +82,29 @@ class Scanners(Task):
- self.folderName = folderName
- def __repr__(self):
- - return f"Task: {self._type} (uploadName {self.uploadName}, folder {self.folderName})"
- + return "Task: {} (uploadName {}, folder {})".format(self._type, self.uploadName, self.folderName)
- def run(self):
- """Start the monk and nomos agents, wait until they complete and return success or failure."""
- - logging.info(f"Running task: {self._type}")
- + logging.info("Running task: {}".format(self._type))
- # first, get the folder and then upload ID
- folderNum = self.server.GetFolderNum(self.folderName)
- if folderNum is None or folderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for folder {self.folderName}")
- + logging.error("Failed: could not retrieve folder number for folder {}".format(self.folderName))
- return False
- uploadNum = self.server.GetUploadNum(folderNum, self.uploadName)
- if uploadNum is None or uploadNum == -1:
- - logging.error(f"Failed: could not retrieve upload number for upload {self.uploadName} in folder {self.folderName} ({folderNum})")
- + logging.error("Failed: could not retrieve upload number for upload {} in folder {} ({})".format(self.uploadName, self.folderName, folderNum))
- return False
- # now, start the scanners
- - logging.info(f"Running monk and nomos scanners on upload {self.uploadName} ({uploadNum})")
- + logging.info("Running monk and nomos scanners on upload {} ({})".format(self.uploadName, uploadNum))
- self.server.StartMonkAndNomosAgents(uploadNum)
- # and wait until both scanners finish
- - logging.info(f"Waiting for monk and nomos to finish for upload {self.uploadName} ({uploadNum})")
- - self.server.WaitUntilAgentIsDone(uploadNum, "monk", pollSeconds=5)
- - self.server.WaitUntilAgentIsDone(uploadNum, "nomos", pollSeconds=5)
- + logging.info("Waiting for monk and nomos to finish for upload {} ({})".format(self.uploadName, uploadNum))
- + self.server.WaitUntilAgentIsDone(uploadNum, "monk", pollSeconds=10)
- + self.server.WaitUntilAgentIsDone(uploadNum, "nomos", pollSeconds=10)
- return True
- @@ -115,27 +115,27 @@ class Copyright(Task):
- self.folderName = folderName
- def __repr__(self):
- - return f"Task: {self._type} (uploadName {self.uploadName}, folder {self.folderName})"
- + return "Task: {} (uploadName {}, folder {})".format(self._type, self.uploadName, self.folderName)
- def run(self):
- """Start the copyright agent, wait until it completes and return success or failure."""
- - logging.info(f"Running task: {self._type}")
- + logging.info("Running task: {}".format(self._type))
- # first, get the folder and then upload ID
- folderNum = self.server.GetFolderNum(self.folderName)
- if folderNum is None or folderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for folder {self.folderName}")
- + logging.error("Failed: could not retrieve folder number for folder {}".format(self.folderName))
- return False
- uploadNum = self.server.GetUploadNum(folderNum, self.uploadName)
- if uploadNum is None or uploadNum == -1:
- - logging.error(f"Failed: could not retrieve upload number for upload {self.uploadName} in folder {self.folderName} ({folderNum})")
- + logging.error("Failed: could not retrieve upload number for upload {} in folder {} ({})".format(self.uploadName, self.folderName, folderNum))
- return False
- # now, start the scanners
- - logging.info(f"Running copyright agent on upload {self.uploadName} ({uploadNum})")
- + logging.info("Running copyright agent on upload {} ({})".format(self.uploadName, uploadNum))
- self.server.StartCopyrightAgent(uploadNum)
- # and wait until both scanners finish
- - logging.info(f"Waiting for copyright agent to finish for upload {self.uploadName} ({uploadNum})")
- + logging.info("Waiting for copyright agent to finish for upload {} ({})".format(self.uploadName, uploadNum))
- self.server.WaitUntilAgentIsDone(uploadNum, "copyright", pollSeconds=5)
- return True
- @@ -149,37 +149,37 @@ class Reuse(Task):
- self.newFolderName = newFolderName
- def __repr__(self):
- - return f"Task: {self._type} (old: uploadName {self.oldUploadName}, folder {self.oldFolderName}; new: uploadName {self.newUploadName}, folder {self.newFolderName})"
- + return "Task: {} (old: uploadName {}, folder {}; new: uploadName {}, folder {})".format(self._type, self.oldUploadName, self.oldFolderName, self.newUploadName, self.newFolderName)
- def run(self):
- """Start the reuser agents, wait until it completes and return success or failure."""
- - logging.info(f"Running task: {self._type}")
- + logging.info("Running task: {}".format(self._type))
- # first, get the old scan's folder and upload ID
- oldFolderNum = self.server.GetFolderNum(self.oldFolderName)
- if oldFolderNum is None or oldFolderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for old folder {self.oldFolderName}")
- + logging.error("Failed: could not retrieve folder number for old folder {}".format(self.oldFolderName))
- return False
- oldUploadNum = self.server.GetUploadNum(oldFolderNum, self.oldUploadName)
- if oldUploadNum is None or oldUploadNum == -1:
- - logging.error(f"Failed: could not retrieve upload number for old upload {self.oldUploadName} in folder {self.oldFolderName} ({oldFolderNum})")
- + logging.error("Failed: could not retrieve upload number for old upload {} in folder {} ({})".format(self.oldUploadName, self.oldFolderName, oldFolderNum))
- return False
- # next, get the new scan's folder and upload ID
- newFolderNum = self.server.GetFolderNum(self.newFolderName)
- if newFolderNum is None or newFolderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for new folder {self.newFolderName}")
- + logging.error("Failed: could not retrieve folder number for new folder {}".format(self.newFolderName))
- return False
- newUploadNum = self.server.GetUploadNum(newFolderNum, self.newUploadName)
- if newUploadNum is None or newUploadNum == -1:
- - logging.error(f"Failed: could not retrieve upload number for new upload {self.newUploadName} in folder {self.newFolderName} ({newFolderNum})")
- + logging.error("Failed: could not retrieve upload number for new upload {} in folder {} ({})".format(self.newUploadName, self.newFolderName, newFolderNum))
- return False
- # now, start the reuser agent
- - logging.info(f"Running reuser agent on upload {self.newUploadName} ({newUploadNum}) reusing old upload {self.oldUploadName} ({oldUploadNum})")
- + logging.info("Running reuser agent on upload {} ({}) reusing old upload {} ({})".format(self.newUploadName, newUploadNum, self.oldUploadName, oldUploadNum))
- self.server.StartReuserAgent(newUploadNum, oldUploadNum)
- # and wait until reuser finishes
- - logging.info(f"Waiting for reuser to finish for upload {self.newUploadName} ({newUploadNum}) reusing old upload {self.oldUploadName} ({oldUploadNum})")
- + logging.info("Waiting for reuser to finish for upload {} ({}) reusing old upload {} ({})".format(self.newUploadName, newUploadNum, self.oldUploadName, oldUploadNum))
- self.server.WaitUntilAgentIsDone(newUploadNum, "reuser", pollSeconds=5)
- return True
- @@ -215,15 +215,15 @@ class BulkTextMatch(Task):
- # need to get upload ID + upload item ID so we can get licenses
- folderNum = self.server.GetFolderNum(self.folderName)
- if folderNum is None or folderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for folder {self.folderName} when getting licenses")
- + logging.error("Failed: could not retrieve folder number for folder {} when getting licenses".format(self.folderName))
- return -1
- u = self.server._getUploadData(folderNum, self.uploadName, False)
- if u is None:
- - logging.error(f"Failed: could not retrieve upload data for upload {self.uploadName} when getting licenses")
- + logging.error("Failed: could not retrieve upload data for upload {} when getting licenses".format(self.uploadName))
- return -1
- self.parsedLicenses = self.server.GetLicenses(u._id, u.topTreeItemId)
- if self.parsedLicenses is None or self.parsedLicenses == []:
- - logging.error(f"Failed: could not retrieve licenses for upload {self.uploadName}")
- + logging.error("Failed: could not retrieve licenses for upload {}".format(self.uploadName))
- return -1
- lic = self.server.FindLicenseInParsedList(self.parsedLicenses, licenseName)
- @@ -233,25 +233,24 @@ class BulkTextMatch(Task):
- """Helper function: make an action entry at the time the Task is being run."""
- licenseId = self._findLicenseID(licenseName)
- if licenseId == -1:
- - logging.error(f"Failed: could not get license ID for license {licenseName}")
- + logging.error("Failed: could not get license ID for license {}".format(licenseName))
- return None
- return self.server.MakeBulkTextMatchAction(licenseId, licenseName, actionType)
- def run(self):
- """Start the monkbulk agent, wait until it completes and return success or failure."""
- - logging.info(f"Running task: {self._type}")
- - # first, get the folder and then upload ID, and full upload data
- + logging.info("Running task: {}".format(self._type))
- folderNum = self.server.GetFolderNum(self.folderName)
- if folderNum is None or folderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for folder {self.folderName}")
- + logging.error("Failed: could not retrieve folder number for folder {}".format(self.folderName))
- return False
- uploadNum = self.server.GetUploadNum(folderNum, self.uploadName)
- if uploadNum is None or uploadNum == -1:
- - logging.error(f"Failed: could not retrieve upload number for upload {self.uploadName} in folder {self.folderName} ({folderNum})")
- + logging.error("Failed: could not retrieve upload number for upload {} in folder {} ({})".format(self.uploadName, self.folderName, folderNum))
- return False
- u = self.server._getUploadData(folderNum, self.uploadName, False)
- if u is None:
- - logging.error(f"Failed: could not retrieve upload data for upload {self.uploadName} when getting licenses")
- + logging.error("Failed: could not retrieve upload data for upload {} when getting licenses".format(self.uploadName))
- return -1
- # next, create the real actions list from the action tuples
- @@ -259,16 +258,16 @@ class BulkTextMatch(Task):
- for (licenseName, actionType) in self.actionTuples:
- a = self._makeRealAction(licenseName, actionType)
- if a is None:
- - logging.error(f"Failed: could not create action for ({licenseName}, {actionType}) for upload {self.uploadName}")
- + logging.error("Failed: could not create action for ({}, {}) for upload {}".format(licenseName, actionType, self.uploadName))
- return False
- actionList.append(a)
- # now, start the bulk text match agent
- - logging.info(f"Running monkbulk agent on upload {self.uploadName} ({uploadNum})")
- + logging.info("Running monkbulk agent on upload {}( {})".format(self.uploadName, uploadNum))
- self.server.StartBulkTextMatch(self.refText, u.topTreeItemId, actionList)
- # and wait until agent finishes
- - logging.info(f"Waiting for monkbulk to finish for upload {self.uploadName} ({uploadNum})")
- + logging.info("Waiting for monkbulk to finish for upload {} ({})".format(self.uploadName, uploadNum))
- self.server.WaitUntilAgentIsDone(uploadNum, "monkbulk", pollSeconds=5)
- return True
- @@ -281,27 +280,27 @@ class SPDXTV(Task):
- self.outFilePath = outFilePath
- def __repr__(self):
- - return f"Task: {self._type} (uploadName {self.uploadName}, folder {self.folderName}) to file {self.outFileName}"
- + return "Task: {} (uploadName {}, folder {}) to file {}".format(self._type, self.uploadName, self.folderName, self.outFileName)
- def run(self):
- """Start the spdx2tv agents, wait until it completes and return success or failure."""
- - logging.info(f"Running task: {self._type}")
- + logging.info("Running task: {}".format(self._type))
- # first, get the folder and then upload ID
- folderNum = self.server.GetFolderNum(self.folderName)
- if folderNum is None or folderNum == -1:
- - logging.error(f"Failed: could not retrieve folder number for folder {self.folderName}")
- + logging.error("Failed: could not retrieve folder number for folder {}".format(self.folderName))
- return False
- uploadNum = self.server.GetUploadNum(folderNum, self.uploadName)
- if uploadNum is None or uploadNum == -1:
- - logging.error(f"Failed: could not retrieve upload number for upload {self.uploadName} in folder {self.folderName} ({folderNum})")
- - return False
- + logging.error("Failed: could not retrieve upload number for upload {} in folder {} ({})".format(self.uploadName, self.folderName, folderNum))
- + return False
- # now, start the export agent
- - logging.info(f"Running spdx2tv agent on upload {self.uploadName} ({uploadNum})")
- + logging.info("Running spdx2tv agent on upload {} ({})".format(self.uploadName, uploadNum))
- self.server.StartSPDXTVReportGeneratorAgent(uploadNum)
- # wait until the export agent finishes
- - logging.info(f"Waiting for spdx2tv to finish for upload {self.uploadName} ({uploadNum})")
- + logging.info("Waiting for spdx2tv to finish for upload {} ({}".format(self.uploadName, uploadNum))
- self.server.WaitUntilAgentIsDone(uploadNum, "spdx2tv", pollSeconds=5)
- # finally, get and save the SPDX file
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement