Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from jenkinsapi.jenkins import Jenkins
- import calendar
- from datetime import datetime, timedelta
- import time
- ERROR_OCCURRED = 'Sorry, but an error has occurred :('
- JOB_ALREADY_ENABLED = 'Job is already enabled!'
- JOB_ALREADY_DISABLED = 'Job is already disabled!'
- JOB_NOW_ENABLED = 'Job is now enabled'
- JOB_NOW_DISABLED = 'Job is now disabled'
- JOB_DISABLED = 'Job is disabled'
- JOB_NOT_RUNNING = 'There is no job that is currently running...'
- NO_SUCH_JOB_NAME = 'There is no job with that name'
- def getServerInstance():
- return Jenkins('http://jenkins-test/', username='jacobk', password='Qwer1234')
- def valitade_job_name(jobName, server):
- status = None
- isExists = True
- if not server.has_job(jobName):
- status = NO_SUCH_JOB_NAME
- isExists = False
- return status, isExists
- def enable_job(params):
- jobName = params[0]
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- if isExist:
- job_instance = server.get_job(jobName)
- if job_instance.is_enabled():
- status = JOB_ALREADY_ENABLED
- else:
- job_instance.enable()
- if job_instance.is_enabled():
- status = JOB_NOW_ENABLED
- else:
- status = ERROR_OCCURRED
- return status
- def disable_job(params):
- jobName = params[0]
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- if isExist:
- job_instance = server.get_job(jobName)
- if not job_instance.is_enabled():
- status = JOB_ALREADY_DISABLED
- else:
- job_instance.disable()
- if not job_instance.is_enabled():
- status = JOB_NOW_DISABLED
- else:
- status = ERROR_OCCURRED
- return status
- def start_job(params):
- jobName = params[0]
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- parameters = None
- if isExist:
- job_instance = server.get_job(jobName)
- isActualBuild = False
- if job_instance.is_enabled():
- lastBuild = job_instance.get_last_build()
- status = 'Job: ' + jobName + ' #' + str(lastBuild.get_number())
- if lastBuild.is_running():
- status += ' is still running!'
- else:
- if len(params) > 0:
- parameters = {}
- jobParameters = get_job_params([jobName])
- for i in range(1, len(params[1:]) + 1):
- parameters[jobParameters[i - 1]] = params[i]
- server.build_job(jobName, parameters)
- while not isActualBuild:
- time.sleep(1)
- currentBuild = job_instance.get_last_build()
- if not currentBuild == lastBuild:
- status += ' started at ' + str(utc_to_local(currentBuild.get_timestamp())) + '\nBuild URL: ' + currentBuild.get_result_url()
- isActualBuild = True
- else:
- status = JOB_DISABLED
- print status
- return status
- def get_job_params(params):
- jobName = params[0]
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- if isExist:
- status = ''
- job_instance = server.get_job(jobName)
- parameters = job_instance.get_params_list()
- for i in range(0, len(parameters)):
- status += 'Parameter #' + str(i + 1) + ': ' + parameters[i] + '\n'
- return status
- def get_job_description(params):
- jobName = params[0]
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- if isExist:
- job_instance = server.get_job(jobName)
- status = job_instance.get_description()
- return status
- def stop_job(params):
- jobName = params[0]
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- if isExist:
- job_instance = server.get_job(jobName)
- lastBuild = job_instance.get_last_build()
- if lastBuild.is_running():
- lastBuild.stop()
- time.sleep(2)
- if not lastBuild.is_running():
- status = 'Job: ' + jobName + ' #' + str(lastBuild.get_number()) + ' has been stopped'
- else:
- status = ERROR_OCCURRED
- else:
- status = JOB_NOT_RUNNING
- return status
- def status_job(params):
- global lastBuild, lastFailedBuild, lastSuccessfullBuild, buildNumber
- buildNumber = None
- jobName = params[0]
- if len(params) > 1:
- buildNumber = int(params[1])
- server = getServerInstance()
- status, isExist = valitade_job_name(jobName, server)
- if isExist:
- job_instance = server.get_job(jobName)
- if buildNumber is not None:
- lastBuild = job_instance.get_build(buildNumber)
- status = 'Job: ' + jobName + ' #' + str(lastBuild.get_number())
- if lastBuild.is_running():
- status += ' is running'
- else:
- status += ' finished with status: ' + lastBuild.get_status()
- else:
- lastBuild = job_instance.get_last_build();
- lastFailedBuild = job_instance.get_build(job_instance.get_last_failed_buildnumber())
- lastSuccessfullBuild = job_instance.get_build(job_instance.get_last_good_buildnumber())
- status = 'For job: ' + jobName + '\nLast build #' + str(lastBuild.get_number())
- if lastBuild.is_running():
- status += ' is running'
- else:
- status += ' finished with status: ' + lastBuild.get_status()
- status += '\nLast failed build: #' + str(lastFailedBuild.get_number()) + ' started at ' + str(utc_to_local(lastFailedBuild.get_timestamp()));
- status += '\nLast Successful build: #' + str(lastSuccessfullBuild.get_number()) + ' started at ' + str(utc_to_local(lastSuccessfullBuild.get_timestamp()));
- print status
- return status
- def utc_to_local(utc_dt):
- # get integer timestamp to avoid precision lost
- timestamp = calendar.timegm(utc_dt.timetuple())
- local_dt = datetime.fromtimestamp(timestamp)
- assert utc_dt.resolution >= timedelta(microseconds=1)
- return local_dt.replace(microsecond=utc_dt.microsecond)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement