Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3
- from __future__ import print_function
- import optparse, os, sys, dohq_teamcity, yaml, requests, json
- import getpass
- TEAMCITY_URL = "https://teamcity.scm.tripwirelab.com"
- TEAMCITY_DRY_RUN = False
- TEAMCITY_FORCE_RUN = False
- TEAMCITY_INTERACTIVE = False
- TEAMCITY_DEBUG = False
- tc_credentials = ('unknown', 'unknown')
- def teamcity_connection():
- return dohq_teamcity.TeamCity(TEAMCITY_URL, auth=tc_credentials)
- def set_credentials():
- global tc_credentials
- if 'TEAMCITY_USER' in os.environ:
- tc_user = os.environ['TEAMCITY_USER']
- else:
- print("No TEAMCITY_USER env variable defined. Exiting")
- exit(11)
- if 'TEAMCITY_PASS' in os.environ:
- tc_pass = os.environ['TEAMCITY_PASS']
- else:
- tc_pass = getpass.getpass()
- tc_credentials = (tc_user, tc_pass)
- print("User set to: ", tc_credentials[0])
- def get_projects():
- tc = teamcity_connection()
- return tc.projects.get_projects()
- def list_projects():
- projects = get_projects()
- for project in projects:
- print(project.web_url)
- def list_project(projectID):
- projects = get_projects()
- for project in projects:
- if project.web_url == projectID or \
- project.id == projectID:
- print(project)
- def get_project(projectID):
- projects = get_projects()
- for project in projects:
- if project.web_url == projectID or \
- project.id == projectID:
- tc = teamcity_connection()
- return tc.projects.get(project)
- def get_build(buildID):
- tc = teamcity_connection()
- bt = tc.build_types.get("id:"+buildID)
- return bt
- def list_vcs_root(vcsRootID):
- url = "{}/httpAuth/app/rest/vcs-roots/id:{}".format(TEAMCITY_URL, vcsRootID)
- headers = {'Accept': 'application/json'}
- response = requests.get(url, headers=headers, auth=tc_credentials)
- if response.status_code != 200:
- data = list(get_vcs_from_root(vcsRootID))
- else:
- data = response.json()
- return data
- def get_branch_spec(vcsID):
- tc = teamcity_connection()
- vcs_id = tc.build_type_api.get_current_vcs_instances(vcsID).vcs_root_instance[0].vcs_root_id
- branch_spec = list_vcs_root(vcs_id)
- for branch in branch_spec["properties"]["property"]:
- if 'teamcity:branchSpec' in branch["name"]:
- return branch["value"]
- def get_vcs_root(vcsRootID):
- url = "{}/httpAuth/app/rest/vcs-roots/id:{}".format(TEAMCITY_URL, vcsRootID)
- headers = {'Accept': 'application/json'}
- response = requests.get(url, headers=headers, auth=tc_credentials)
- return response.json()
- def get_vcs_root_id_by_project(projectID):
- url = "{}/httpAuth/app/rest/vcs-roots?locator=project:(id:{})".format(TEAMCITY_URL, projectID)
- headers = {'Accept': 'application/json'}
- response = requests.get(url, headers=headers, auth=tc_credentials)
- dprint(response)
- dprint(response.text)
- data = response.json()
- vcs_root = data["vcs-root"][0]
- return vcs_root["id"]
- def get_vcs_root_from_build(buildType):
- url = "{}/httpAuth/app/rest/buildTypes/id:{}".format(TEAMCITY_URL, buildType)
- headers = {'Accept': 'application/json'}
- response = requests.get(url, headers=headers, auth=tc_credentials)
- data = response.json()
- data = data["vcs-root-entries"]["vcs-root-entry"][0]
- return data
- def get_vcs_from_root(rootID):
- url = "{}/httpAuth/app/rest/projects/{}/buildTypes/".format(TEAMCITY_URL, rootID)
- headers = {'Accept': 'application/json'}
- response = requests.get(url, headers=headers, auth=tc_credentials)
- data = response.json()
- vcs_list = []
- elements = []
- for build in data["buildType"]:
- url = "{}/httpAuth/app/rest/buildTypes/id:{}".format(TEAMCITY_URL, build["id"])
- response = requests.get(url, headers=headers, auth=tc_credentials)
- vcs = response.json()
- vcs = vcs["vcs-root-entries"]["vcs-root-entry"]
- for element in vcs:
- vcs_list.append(element)
- for element in vcs_list:
- elements.append(element["vcs-root"]["id"])
- return set(elements)
- def get_vcs_name_root(vcsRootID, yaml_file, isIDL):
- with open(yaml_file, 'r') as ymlfile:
- properties_config = yaml.safe_load(ymlfile)
- if "name" in properties_config:
- name_vcs_root = get_vcs_root(vcsRootID)["name"] + "#" + properties_config["name"]
- id_vcs_root = get_vcs_root(vcsRootID)["id"] + properties_config["name"]
- else:
- name_vcs_root = get_vcs_root(vcsRootID)["name"]
- id_vcs_root = get_vcs_root(vcsRootID)["id"]
- if isIDL and "idl" not in str(name_vcs_root).lower():
- return name_vcs_root, id_vcs_root, properties_config
- elif not isIDL:
- return name_vcs_root, id_vcs_root, properties_config
- def set_unique_vcs_root(vcsRootID, yaml_file, isIDL=False):
- name_vcs_root, id_vcs_root, properties_config = get_vcs_name_root(vcsRootID, yaml_file, isIDL) or (None, None, None)
- if name_vcs_root is not None and id_vcs_root is not None:
- try:
- url_name = "{}/httpAuth/app/rest/vcs-roots/{}/name".format(TEAMCITY_URL, vcsRootID)
- url_id = "{}/httpAuth/app/rest/vcs-roots/{}/id".format(TEAMCITY_URL, vcsRootID)
- requests.put(url_name, data=name_vcs_root, auth=tc_credentials)
- requests.put(url_id, data=id_vcs_root, auth=tc_credentials)
- for key, value in properties_config.items():
- url = "{}/httpAuth/app/rest/vcs-roots/{}/properties/{}".format(TEAMCITY_URL, vcsRootID, key)
- requests.put(url, data=value, auth=tc_credentials)
- except Exception as ex:
- print(ex)
- def set_multiple_vcs_root(vcsRootID, yaml_file):
- list_vcs = list_vcs_root(vcsRootID)
- for vcs_id in list_vcs:
- if "idl" not in vcs_id.lower():
- set_unique_vcs_root(vcs_id, yaml_file, True)
- def list_vcs_roots(build_types, outfile):
- child_projects_id = [x.id for x in build_types]
- if TEAMCITY_DEBUG: print("child project ids: ", child_projects_id)
- child_projects_name = [x.name for x in build_types]
- for indx, (name,child) in enumerate(zip(child_projects_name,child_projects_id)):
- outfile.write("\t\t-" + str(name) + "\n")
- builds = get_branch_spec(child)
- if builds:
- if TEAMCITY_DEBUG: print("builds found: '" + builds + "'\n")
- for thisbranch in builds.split("\n"):
- outfile.write("\t\t\t"+ thisbranch + "\n")
- else:
- outfile.write("\t\t\t(No build branches)" + "\n")
- def export_list(projectID):
- #create a file withe project id name, getting all the busprojects and vcsroot from each build. Parent-child structure
- f = open(projectID + "-file.txt", "w")
- f.write(" " + str(projectID) + "\n")
- builds_id = []
- parent_projects_list = get_project(projectID)
- if TEAMCITY_DEBUG: print("List of projects: ", parent_projects_list.projects)
- if TEAMCITY_DEBUG: print("List of project build types: ", parent_projects_list.build_types)
- is_subproject = True if parent_projects_list.build_types else False
- if TEAMCITY_DEBUG: print("Is subproject? ", is_subproject)
- if not is_subproject:
- for project in [parent_projects_list.projects]:
- child_project_ids = [x.id for x in project.project]
- if TEAMCITY_DEBUG: print("child projects", child_project_ids)
- for subproject in child_project_ids:
- if TEAMCITY_DEBUG: print("processing", subproject)
- f.write(" \t" + subproject + "\n")
- project_info = get_project(subproject)
- if TEAMCITY_DEBUG: print("project info", project_info)
- list_vcs_roots(project_info.build_types, f)
- else:
- list_vcs_roots(parent_projects_list.build_types, f)
- f.close()
- def exit_with_error(parser, status):
- parser.print_help()
- exit(status)
- def exit_with_message(message, status):
- eprint(message)
- exit(status)
- def eprint(*args, **kwargs):
- print(*args, file=sys.stderr, **kwargs)
- def dprint(message):
- if TEAMCITY_DEBUG: eprint(message)
- def dpprint(message):
- if TEAMCITY_DEBUG:
- from pprint import pprint
- pprint(message)
- def enable_debug():
- eprint("Enabling debug urllib3/dohq_teamcity output")
- from pprint import pprint
- import logging
- logging.getLogger("urllib3").setLevel(logging.DEBUG)
- logging.getLogger("dohq_teamcity").setLevel(logging.DEBUG)
- global TEAMCITY_DEBUG
- TEAMCITY_DEBUG = True
- def set_vcs_root_property(vcsRootID, property, value):
- dprint("About to set vcsRootId {} property {} => {}".format(vcsRootID, property, value))
- url = "{}/httpAuth/app/rest/vcs-roots/{}/properties/{}".format(
- TEAMCITY_URL, vcsRootID, property)
- response = requests.put(url, data=value, auth=tc_credentials)
- dprint(response)
- dprint(response.headers)
- dprint(response.text)
- def set_vcs_root_properties(build_type, feature_definition_file):
- with open(feature_definition_file, 'r') as ymlfile:
- feature_config = yaml.safe_load(ymlfile)
- vcsRootID = get_vcs_root_from_build(build_type)['id']
- try:
- property = 'branch'
- value = feature_config[property]
- set_vcs_root_property(vcsRootID, property, value)
- except KeyError as e:
- dprint(e)
- try:
- property = 'teamcity:branchSpec'
- value = feature_config[property]
- set_vcs_root_property(vcsRootID, property, value)
- except KeyError as e:
- dprint(e)
- def read_commit_status_publisher_properties(feature_config, projectID):
- property0 = dohq_teamcity.ModelProperty(name="github_authentication_type",
- value=feature_config['github_authentication_type'])
- property1 = dohq_teamcity.ModelProperty(name="github_host",
- value=feature_config['github_host'])
- property2 = dohq_teamcity.ModelProperty(name="publisherId",
- value=feature_config['publisherId'])
- property3 = dohq_teamcity.ModelProperty(name="secure:github_access_token",
- value=feature_config['secure:github_access_token'])
- try:
- vcsRootID = feature_config['vcsRootId']
- except KeyError as e:
- vcsRootID = get_vcs_root_id_by_project(projectID)
- property4 = dohq_teamcity.ModelProperty(name="vcsRootId", value=vcsRootID)
- properties_list = [property0, property1, property2, property3, property4]
- return properties_list
- def read_xml_report_plugin_properties(feature_config):
- property0 = dohq_teamcity.ModelProperty(name="xmlReportParsing.reportType",
- value=feature_config['xmlReportParsing.reportType'])
- property1 = dohq_teamcity.ModelProperty(name="xmlReportParsing.reportDirs",
- value=feature_config['xmlReportParsing.reportDirs'])
- properties_list = [property0, property1]
- return properties_list
- def read_feature_from_file(feature_definition_file, projectID):
- with open(feature_definition_file, 'r') as ymlfile:
- feature_config = yaml.safe_load(ymlfile)
- feature = dohq_teamcity.Feature()
- feature.type = feature_config['feature_type']
- if feature.type == "commit-status-publisher":
- properties_list = read_commit_status_publisher_properties(feature_config, projectID)
- elif feature.type == "xml-report-plugin":
- properties_list = read_xml_report_plugin_properties(feature_config)
- else:
- exit_with_message("Unsupported feature type: {}".format(feature.type), 2)
- properties = dohq_teamcity.Properties(count=len(properties_list), _property=properties_list)
- feature.properties = properties
- dpprint(feature)
- return feature
- def test_github_connection(repo, token):
- permission_to_push = False
- try:
- response = requests.get(repo, headers={'Authorization': 'token {}'.format(token)})
- dprint(response.text)
- response_json = response.json()
- permission_to_push = response_json['permissions']['push']
- except KeyError as e:
- print("Invalid token: '{}', for '{}'".format(token, repo))
- print("please review you feature commit-status-publisher.yaml file or use the -f/--force flags to bypass this step")
- return False
- if permission_to_push:
- return permission_to_push
- else:
- eprint("Invalid token: {}, for '{}'".format(token, repo))
- return False
- # https://stackoverflow.com/questions/3663450/python-remove-substring-only-at-the-end-of-string
- def rchop(thestring, ending):
- if thestring.endswith(ending):
- return thestring[:-len(ending)]
- return thestring
- def extract_github_repo(vcs_root):
- for property in vcs_root["properties"]["property"]:
- if property["name"] == "url":
- # git@github.scm.tripwire.com:tripwire/cap-reference_plugin.git
- github_repo = property["value"]
- github_repo = github_repo.split(":")[1]
- github_repo = rchop(github_repo, ".git")
- return github_repo
- def validate_feature(feature):
- if feature.type == "commit-status-publisher":
- for property in feature.properties._property:
- if property.name == "vcsRootId":
- vcs_root = get_vcs_root(property.value)
- if property.name == "github_host":
- github_host = property.value
- if property.name == "secure:github_access_token":
- github_token = property.value
- github_repo = extract_github_repo(vcs_root)
- github_url = github_host + "/repos/" + github_repo
- return test_github_connection(github_url, github_token)
- elif feature.type == "xml-report-plugin":
- return True
- return False
- def user_confirms(message):
- answer = str(input(message)).lower().strip()
- try:
- if answer[0] == 'y':
- return True
- elif answer[0] == 'n':
- return False
- elif answer[0] == 'q':
- exit_with_message("Aborting execution ...", 2)
- else:
- print('Invalid Input')
- return user_confirms(message)
- except Exception as error:
- print("Please enter a valid answer")
- print(error)
- return user_confirms(message)
- def add_feature_to_project_or_build(projectOrBuildID, feature_definition_file, *args, **kwargs):
- build_type_regex = kwargs.get('build_type_regex', None)
- if not os.path.exists(feature_definition_file):
- exit_with_message("Feature definition file '{}' couldn't be found".format(feature_definition_file), 2)
- project = get_project(projectOrBuildID)
- if project:
- feature = read_feature_from_file(feature_definition_file, projectOrBuildID)
- for build_type in project.build_types:
- if build_type_regex:
- if not (build_type_regex in build_type.id or build_type_regex in build_type.name): continue
- tc = teamcity_connection() #required due to short default urllib3 timeouts
- build_type_details = tc.build_types.get(build_type)
- build_feature(build_type_details, feature)
- if not project :
- build_type_details = get_build(projectOrBuildID)
- if build_type_details:
- feature = read_feature_from_file(feature_definition_file, projectOrBuildID)
- build_feature(build_type_details, feature)
- if not build_type_details:
- exit_with_message("Project '{}' couldn't be found at {}".format(projectOrBuildID, TEAMCITY_URL), 2)
- def build_feature(build_type, feature):
- commit_publisher_is_configured = False
- dprint("About to process {} / {}".format(build_type.name, build_type.id))
- for build_type_feature in build_type.features:
- if build_type_feature.type == feature.type:
- for feature_property in build_type_feature.properties._property:
- if feature.type == "commit-status-publisher" and \
- feature_property.name == "github_host" and \
- feature_property.value == "https://github.scm.tripwire.com/api/v3":
- commit_publisher_is_configured = True
- dpprint(build_type_feature)
- if feature.type == "xml-report-plugin" and \
- feature_property.name == "xmlReportParsing.reportType" and \
- feature_property.value == "junit":
- commit_publisher_is_configured = True
- dpprint(build_type_feature)
- if commit_publisher_is_configured:
- print("Job {} has been already configured, skipping".format(build_type.id))
- else:
- print("Job {} has NOT been configured, attempting to do it".format(build_type.id))
- if not TEAMCITY_FORCE_RUN and not validate_feature(feature):
- return
- if TEAMCITY_DRY_RUN:
- print("Running in dry mode, skipping")
- else:
- if TEAMCITY_INTERACTIVE and user_confirms("Want to apply the change? [Y/n/q]: "):
- set_vcs_root_properties(build_type.id, feature_definition_file)
- tc = teamcity_connection()
- response = tc.build_type_api.add_feature(build_type.id, body=feature)
- from pprint import pprint
- pprint(response)
- def add_feature_to_project_with_build_type_regex(projectID, build_type_regex, feature_definition_file):
- add_feature_to_project_or_build(projectID, feature_definition_file, build_type_regex=build_type_regex)
- def main():
- usage = "Usage: %prog [options] [subcommand] [options] ...\n"
- usage += "Automate Nahuales TeamCity tasks"
- usage += "\n\n"
- usage += "Subcommands\n"
- usage += " list_projects\n"
- usage += " list_project PROJECT_ID\n"
- usage += " list_vcs_root VCS_ROOT_ID\n"
- usage += " add_feature PROJECT_ID FEATURE_FILE_DEFINITION.yaml\n"
- usage += " add_feature PROJECT_ID JOB_REGEX FEATURE_FILE_DEFINITION.yaml\n"
- usage += " edit_vcs_root VCS_ROOT_ID VCS_ROOT_DEFINITION.yaml\n"
- usage += " edit_multiple_vcs_root PROJECT_ID VCS_ROOT_DEFINITION.yaml\n"
- usage += " export_list PROJECT_ID"
- parser = optparse.OptionParser(usage=usage)
- parser.add_option('-n', '--dry-run', dest='dry_run',
- default=False, action='store_true',
- help='print possible changes without actually applying them')
- parser.add_option('-f', '--force', dest='force_run',
- default=False, action='store_true',
- help='apply changes forcefully')
- parser.add_option('-i', '--interactive', dest='interactive',
- default=False, action='store_true',
- help='ask before applying changes')
- parser.add_option('-d', '--debug', dest='enable_debug',
- default=False, action='store_true',
- help='enable debugging output')
- opts, args = parser.parse_args()
- if len(args) == 0:
- exit_with_error(parser, 0)
- if opts.dry_run:
- global TEAMCITY_DRY_RUN
- TEAMCITY_DRY_RUN = True
- if opts.force_run:
- global TEAMCITY_FORCE_RUN
- TEAMCITY_FORCE_RUN = True
- if opts.interactive:
- global TEAMCITY_INTERACTIVE
- TEAMCITY_INTERACTIVE = True
- if opts.enable_debug:
- enable_debug()
- set_credentials()
- subcommand = args[0]
- if subcommand == "list_projects":
- list_projects()
- elif subcommand == "list_project":
- try:
- list_project(args[1])
- except IndexError:
- exit_with_error(parser, 1)
- elif subcommand == "list_vcs_root":
- try:
- list_vcs_root(args[1])
- except IndexError:
- exit_with_error(parser, 1)
- elif subcommand == "add_feature":
- try:
- add_feature_to_project_with_build_type_regex(args[1], args[2], args[3])
- except IndexError:
- try:
- add_feature_to_project_or_build(args[1], args[2])
- except IndexError:
- exit_with_error(parser, 1)
- elif subcommand == "del_feature":
- print("TODO")
- elif subcommand == "edit_feature":
- print("TODO")
- elif subcommand == "edit_vcs_root":
- try:
- set_unique_vcs_root(args[1], args[2])
- except IndexError:
- exit_with_error(parser, 1)
- elif subcommand == "edit_multiple_vcs_root":
- try:
- set_multiple_vcs_root(args[1], args[2])
- except IndexError:
- exit_with_error(parser, 1)
- elif subcommand == "export_list":
- try:
- export_list(args[1])
- except IndexError:
- exit_with_error(parser, 1)
- else:
- exit_with_error(parser, 1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement