Advertisement
Guest User

Untitled

a guest
Mar 23rd, 2018
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.76 KB | None | 0 0
  1. import re
  2. import requests
  3.  
  4. from viewmodels.jenkins_view_model import JobViewModel, BuildViewModel, LastBuildViewModel
  5.  
  6.  
  7. class JenkinsClient(object):
  8.     username = None
  9.     password = None
  10.     jenkins_url = None
  11.  
  12.     def __init__(self, **kwargs):
  13.         try:
  14.             self.username = kwargs.pop('username')
  15.         except KeyError:
  16.             raise 'Jenkins username not provided. Username must be defined before proceding.'
  17.         try:
  18.             self.password = kwargs.pop('password')
  19.         except KeyError:
  20.             raise 'Jenkins password not provided. Password must be defined before proceding.'
  21.         try:
  22.             temp_jenkins_url = kwargs.pop('jenkins_url')
  23.             does_match = re.match(
  24.                 r'(http:\/\/)|(https:\/\/)(www)?', temp_jenkins_url)
  25.             if does_match:
  26.                 self.jenkins_url = re.sub(r'(http:\/\/)|(https:\/\/)(www)?', r'\1%s:%s@' % (
  27.                     self.username, self.password), temp_jenkins_url)
  28.             else:
  29.                 self.jenkins_url = 'http://%s:%s@%s' % (
  30.                     self.username, self.password, temp_jenkins_url)
  31.         except KeyError:
  32.             raise 'Jenkins url not provided. Password must be defined before proceding.'
  33.  
  34.     def list_jobs(self, params=['name', 'color', 'url']):
  35.         response = requests.get(
  36.             '%s/api/json?tree=jobs[%s]' % (self.jenkins_url, ','.join(params)))
  37.         if response.status_code == requests.codes.get('ok'):
  38.             jobs = response.json().pop('jobs', None)
  39.             return [JobViewModel(**job) for job in jobs]
  40.         response.raise_for_status()
  41.  
  42.     def list_builds(self, job_name, params=['number', 'status', 'timestamp', 'id', 'result']):
  43.         if not job_name:
  44.             raise 'The Jenkins job name is required. Please provide one to proced.'
  45.         response = requests.get(
  46.             '%s/job/%s/api/json?tree=builds[%s]' % (self.jenkins_url, job_name, ','.join(params)))
  47.         if response.status_code == requests.codes.get('ok'):
  48.             builds = response.json().pop('builds', None)
  49.             return [BuildViewModel(job_name=job_name, **build) for build in builds]
  50.         response.raise_for_status()
  51.  
  52.     def get_last_build(self, job_name, params=['displayName', 'building', 'duration', 'estimatedDuration', 'id', 'result']):
  53.         if not job_name:
  54.             raise 'The Jenkins job name is required. Please provide one to proced.'
  55.         response = requests.get(
  56.             '%s/job/%s/lastBuild/api/json?tree=%s' % (self.jenkins_url, job_name, ','.join(params)))
  57.         if response.status_code == requests.codes.get('ok'):
  58.             last_build = response.json()
  59.             return LastBuildViewModel(job_name=job_name, **last_build)
  60.         response.raise_for_status()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement