Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import logging
- import json
- import redfish.ris.tpdefs
- from redfish import AuthMethod, redfish_logger, redfish_client
- #Config logger used by HPE Restful library
- #LOGGERFILE = "RedfishApi.log"
- #LOGGERFORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- #LOGGER = redfish_logger(LOGGERFILE, LOGGERFORMAT, logging.INFO)
- #LOGGER.info("HPE Redfish API examples")
- class RedfishObject(object):
- def __init__(self, host, login_account, login_password, log):
- try:
- self.redfish_client = redfish_client(base_url=host, \
- username=login_account, password=login_password, \
- default_prefix="/redfish/v1")
- except:
- raise
- self.LOGGER = log
- self.typepath = redfish.ris.tpdefs.Typesandpathdefines()
- self.typepath.getgen(url=host, logger=self.LOGGER)
- self.typepath.defs.redfishchange()
- self.redfish_client.login(auth=AuthMethod.SESSION)
- self.SYSTEMS_RESOURCES = self.get_resource_directory()
- self.MESSAGE_REGISTRIES = self.get_base_registry()
- def delete_obj(self):
- try:
- self.redfish_client.logout()
- except AttributeError as excp:
- pass
- def search_for_type(self, type):
- instances = []
- for item in self.SYSTEMS_RESOURCES["resources"]:
- foundsettings = False
- if "@odata.type" in item and type.lower() in \
- item["@odata.type"].lower():
- for entry in self.SYSTEMS_RESOURCES["resources"]:
- if (item["@odata.id"] + "/settings/").lower() == \
- (entry["@odata.id"]).lower():
- foundsettings = True
- if not foundsettings:
- instances.append(item)
- if not instances:
- self.LOGGER.error("\t'%s' resource or feature is not " \
- "supported on this system\n" % type)
- return instances
- def error_handler(self, response):
- if not self.MESSAGE_REGISTRIES:
- self.LOGGER.error("ERROR: No message registries found.")
- try:
- message = json.loads(response.text)
- newmessage = message["error"]["@Message.ExtendedInfo"][0]\
- ["MessageId"].split(".")
- except:
- self.LOGGER.info("\tNo extended error information returned by " \
- "iLO.\n")
- return
- for err_mesg in self.MESSAGE_REGISTRIES:
- if err_mesg != newmessage[0]:
- continue
- else:
- for err_entry in self.MESSAGE_REGISTRIES[err_mesg]:
- if err_entry == newmessage[3]:
- self.LOGGER.info("\tiLO return code %s: %s\n" % (\
- message["error"]["@Message.ExtendedInfo"][0]\
- ["MessageId"], self.MESSAGE_REGISTRIES\
- [err_mesg][err_entry]["Description"]))
- def redfish_get(self, suburi):
- """REDFISH GET"""
- return self.redfish_client.get(path=suburi)
- def redfish_patch(self, suburi, request_body, optionalpassword=None):
- """REDFISH PATCH"""
- self.LOGGER.info("PATCH " + str(request_body) + " to " + suburi + "\n")
- response = self.redfish_client.patch(path=suburi, body=request_body, \
- optionalpassword=optionalpassword)
- self.LOGGER.info("PATCH response = " + str(response.status) + "\n")
- return response
- def redfish_put(self, suburi, request_body, optionalpassword=None):
- """REDFISH PUT"""
- self.LOGGER.info("PUT " + str(request_body) + " to " + suburi + "\n")
- response = self.redfish_client.put(path=suburi, body=request_body, \
- optionalpassword=optionalpassword)
- self.LOGGER.info("PUT response = " + str(response.status) + "\n")
- return response
- def redfish_post(self, suburi, request_body):
- """REDFISH POST"""
- self.LOGGER.info("POST " + str(request_body) + " to " + suburi + "\n")
- response = self.redfish_client.post(path=suburi, body=request_body)
- self.LOGGER.info("POST response = " + str(response.status) + "\n")
- return response
- def redfish_delete(self, suburi):
- """REDFISH DELETE"""
- self.LOGGER.info("DELETE " + suburi + "\n")
- response = self.redfish_client.delete(path=suburi)
- self.LOGGER.info("DELETE response = " + str(response.status) + "\n")
- return response
- def get_resource_directory(self):
- response = self.redfish_get("/redfish/v1/resourcedirectory/")
- resources = {}
- if response.status == 200:
- resources["resources"] = response.dict["Instances"]
- return resources
- else:
- self.LOGGER.error("\tResource directory missing at " \
- "/redfish/v1/resourcedirectory" + "\n")
- def get_base_registry(self):
- response = self.redfish_get("/redfish/v1/Registries/")
- messages = {}
- location = None
- for entry in response.dict["Members"]:
- if not [x for x in ["/Base/", "/iLO/"] if x in entry["@odata.id"]]:
- continue
- else:
- registry = self.redfish_get(entry["@odata.id"])
- for location in registry.dict["Location"]:
- if "extref" in location["Uri"]:
- location = location["Uri"]["extref"]
- else:
- location = location["Uri"]
- reg_resp = self.redfish_get(location)
- if reg_resp.status == 200:
- messages[reg_resp.dict["RegistryPrefix"]] = \
- reg_resp.dict["Messages"]
- else:
- self.LOGGER.info("\t" + reg_resp.dict["RegistryPrefix"] + \
- " not found at " + location + "\n")
- return messages
Add Comment
Please, Sign In to add comment