Advertisement
Guest User

Untitled

a guest
Nov 13th, 2016
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.38 KB | None | 0 0
  1. from jenkinsapi.jenkins import Jenkins
  2. import calendar
  3. from datetime import datetime, timedelta
  4. import time
  5.  
  6. ERROR_OCCURRED = 'Sorry, but an error has occurred :('
  7. JOB_ALREADY_ENABLED = 'Job is already enabled!'
  8. JOB_ALREADY_DISABLED = 'Job is already disabled!'
  9. JOB_NOW_ENABLED = 'Job is now enabled'
  10. JOB_NOW_DISABLED = 'Job is now disabled'
  11. JOB_DISABLED = 'Job is disabled'
  12. JOB_NOT_RUNNING = 'There is no job that is currently running...'
  13. NO_SUCH_JOB_NAME = 'There is no job with that name'
  14.  
  15.  
  16. def getServerInstance():
  17.     return Jenkins('http://jenkins-test/', username='jacobk', password='Qwer1234')
  18.  
  19.  
  20. def valitade_job_name(jobName, server):
  21.     status = None
  22.     isExists = True
  23.  
  24.     if not server.has_job(jobName):
  25.         status = NO_SUCH_JOB_NAME
  26.         isExists = False
  27.  
  28.     return status, isExists
  29.  
  30.  
  31. def enable_job(params):
  32.     jobName = params[0]
  33.     server = getServerInstance()
  34.     status, isExist = valitade_job_name(jobName, server)
  35.  
  36.     if isExist:
  37.         job_instance = server.get_job(jobName)
  38.         if job_instance.is_enabled():
  39.             status = JOB_ALREADY_ENABLED
  40.         else:
  41.             job_instance.enable()
  42.             if job_instance.is_enabled():
  43.                 status = JOB_NOW_ENABLED
  44.             else:
  45.                 status = ERROR_OCCURRED
  46.  
  47.     return status
  48.  
  49.  
  50. def disable_job(params):
  51.     jobName = params[0]
  52.     server = getServerInstance()
  53.     status, isExist = valitade_job_name(jobName, server)
  54.  
  55.     if isExist:
  56.         job_instance = server.get_job(jobName)
  57.  
  58.         if not job_instance.is_enabled():
  59.             status = JOB_ALREADY_DISABLED
  60.         else:
  61.             job_instance.disable()
  62.             if not job_instance.is_enabled():
  63.                 status = JOB_NOW_DISABLED
  64.             else:
  65.                 status = ERROR_OCCURRED
  66.  
  67.     return status
  68.  
  69.  
  70. def start_job(params):
  71.     jobName = params[0]
  72.     server = getServerInstance()
  73.     status, isExist = valitade_job_name(jobName, server)
  74.     parameters = None
  75.  
  76.     if isExist:
  77.         job_instance = server.get_job(jobName)
  78.         isActualBuild = False
  79.         if job_instance.is_enabled():
  80.             lastBuild = job_instance.get_last_build()
  81.             status = 'Job: ' + jobName + ' #' + str(lastBuild.get_number())
  82.  
  83.             if lastBuild.is_running():
  84.                 status += ' is still running!'
  85.             else:
  86.                 if len(params) > 0:
  87.                     parameters = {}
  88.                     jobParameters = get_job_params([jobName])
  89.                     for i in range(1, len(params[1:]) + 1):
  90.                         parameters[jobParameters[i - 1]] = params[i]
  91.  
  92.                 server.build_job(jobName, parameters)
  93.                 while not isActualBuild:
  94.                     time.sleep(1)
  95.                     currentBuild = job_instance.get_last_build()
  96.                     if not currentBuild == lastBuild:
  97.                         status += ' started at ' + str(utc_to_local(currentBuild.get_timestamp())) + '\nBuild URL: ' + currentBuild.get_result_url()
  98.                         isActualBuild = True
  99.         else:
  100.             status = JOB_DISABLED
  101.  
  102.     print status
  103.     return status
  104.  
  105.  
  106. def get_job_params(params):
  107.     jobName = params[0]
  108.     server = getServerInstance()
  109.     status, isExist = valitade_job_name(jobName, server)
  110.  
  111.     if isExist:
  112.         status = ''
  113.         job_instance = server.get_job(jobName)
  114.         parameters = job_instance.get_params_list()
  115.         for i in range(0, len(parameters)):
  116.             status += 'Parameter #' + str(i + 1) + ': ' + parameters[i] + '\n'
  117.  
  118.     return status
  119.  
  120.  
  121. def get_job_description(params):
  122.     jobName = params[0]
  123.     server = getServerInstance()
  124.     status, isExist = valitade_job_name(jobName, server)
  125.  
  126.     if isExist:
  127.         job_instance = server.get_job(jobName)
  128.         status = job_instance.get_description()
  129.  
  130.     return status
  131.  
  132.  
  133. def stop_job(params):
  134.     jobName = params[0]
  135.     server = getServerInstance()
  136.     status, isExist = valitade_job_name(jobName, server)
  137.  
  138.     if isExist:
  139.         job_instance = server.get_job(jobName)
  140.         lastBuild = job_instance.get_last_build()
  141.         if lastBuild.is_running():
  142.             lastBuild.stop()
  143.             time.sleep(2)
  144.             if not lastBuild.is_running():
  145.                 status = 'Job: ' + jobName + ' #' + str(lastBuild.get_number()) + ' has been stopped'
  146.             else:
  147.                 status = ERROR_OCCURRED
  148.         else:
  149.             status = JOB_NOT_RUNNING
  150.  
  151.     return status
  152.  
  153.  
  154. def status_job(params):
  155.     global lastBuild, lastFailedBuild, lastSuccessfullBuild, buildNumber
  156.     buildNumber = None
  157.     jobName = params[0]
  158.  
  159.     if len(params) > 1:
  160.         buildNumber = int(params[1])
  161.     server = getServerInstance()
  162.     status, isExist = valitade_job_name(jobName, server)
  163.  
  164.     if isExist:
  165.         job_instance = server.get_job(jobName)
  166.         if buildNumber is not None:
  167.             lastBuild = job_instance.get_build(buildNumber)
  168.             status = 'Job: ' + jobName + ' #' + str(lastBuild.get_number())
  169.             if lastBuild.is_running():
  170.                  status += ' is running'
  171.             else:
  172.                 status += ' finished with status: ' +  lastBuild.get_status()
  173.         else:
  174.             lastBuild = job_instance.get_last_build();
  175.             lastFailedBuild = job_instance.get_build(job_instance.get_last_failed_buildnumber())
  176.             lastSuccessfullBuild = job_instance.get_build(job_instance.get_last_good_buildnumber())
  177.  
  178.             status = 'For job: ' + jobName + '\nLast build #' + str(lastBuild.get_number())
  179.  
  180.             if lastBuild.is_running():
  181.                 status += ' is running'
  182.             else:
  183.                 status += ' finished with status: ' + lastBuild.get_status()
  184.             status += '\nLast failed build: #' + str(lastFailedBuild.get_number()) + ' started at ' + str(utc_to_local(lastFailedBuild.get_timestamp()));
  185.             status += '\nLast Successful build: #' + str(lastSuccessfullBuild.get_number()) + ' started at ' + str(utc_to_local(lastSuccessfullBuild.get_timestamp()));
  186.  
  187.     print status
  188.     return status
  189.  
  190.  
  191. def utc_to_local(utc_dt):
  192.     # get integer timestamp to avoid precision lost
  193.     timestamp = calendar.timegm(utc_dt.timetuple())
  194.     local_dt = datetime.fromtimestamp(timestamp)
  195.     assert utc_dt.resolution >= timedelta(microseconds=1)
  196.     return local_dt.replace(microsecond=utc_dt.microsecond)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement