Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from logger import logger
- from time import sleep
- from logger import logger
- from cm_api.api_client import ApiResource, ApiException
- from cm_api.endpoints import host_templates
- from cm_api.endpoints.clusters import ApiCluster
- from managed import managed
- class CMApiHelper:
- def __init__(self, cm_host, cm_user, cm_password, cm_cluster_name):
- self.api = ApiResource(cm_host, username=cm_user, password=cm_password)
- self.cm_cluster_name = cm_cluster_name
- self.cm = self.api.get_cloudera_manager()
- self.cluster = ApiCluster(self.api, cm_cluster_name)
- def refresh_yarn(self):
- service = self.cluster.get_service("yarn")
- for r in service.get_all_roles():
- if r.type == 'RESOURCEMANAGER':
- service.refresh(r.name)
- # The refresh command returns immediately. Sleep to let the refresh take effect
- sleep(30)
- def refresh_nodemanager(self, host):
- host_info = self.api.get_host(host)
- if len(host_info.roleRefs) == 0:
- raise Exception("Host does not have any active roles, setup must have failed!")
- for role_ref in host_info.roleRefs:
- if role_ref.serviceName == 'yarn':
- yarn = self.cluster.get_service("yarn")
- yarn.refresh(role_ref.roleName)
- @managed
- def host_install(self, host_user, host, private_key):
- with open(private_key, 'r') as f:
- private_key_contents = f.read()
- return self.cm.host_install(host_user, [host], private_key=private_key_contents,
- java_install_strategy="NONE").wait()
- @managed
- def add_host(self, host):
- try:
- self.cluster.add_hosts([host])
- sleep(10)
- except ApiException as e:
- if "already belongs to DbCluster" not in e.message:
- raise
- @managed
- def start_roles(self, host):
- result = self.cm.hosts_start_roles([host]).wait()
- return result
- @managed
- def apply_host_template(self, role_template, host):
- return host_templates.apply_host_template(self.api, role_template, self.cm_cluster_name, [host],
- start_roles=False).wait()
- @managed
- def host_recommission(self, host):
- return self.cm.hosts_recommission([host]).wait()
- @managed
- def check_host(self, host):
- host_info = self.api.get_host(host)
- if len(host_info.roleRefs) == 0:
- raise Exception("Host does not have any active roles, setup must of failed!")
- for role_ref in host_info.roleRefs:
- if role_ref.serviceName == 'yarn':
- yarn = self.cluster.get_service("yarn")
- role = yarn.get_role(role_ref.roleName)
- if role.roleState != 'STARTED' or role.healthSummary != 'GOOD':
- self.refresh_yarn()
- self.refresh_nodemanager(host)
- yarn.restart_roles(role_ref.roleName)
- raise Warning("Yarn failed to start, attempted to start up the role")
- else:
- logger.info("Yarn successfully started!")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement