Guest User

Untitled

a guest
Apr 8th, 2019
47
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.20 KB | None | 0 0
  1. import base64
  2. import requests
  3.  
  4. INFO = 'api/json'
  5. JOB_INFO = 'job/%s/api/json?depth=0'
  6. JOB_NAME = 'job/%s/api/json?tree=name'
  7. JOB_CONFIG = 'job/%s/config.xml'
  8. CREATE_JOB = 'createItem?name=%s'
  9. ENABLE_JOB = 'job/%s/enable'
  10. DISABLE_JOB = 'job/%s/disable'
  11.  
  12.  
  13. class JenkinsException(Exception):
  14. pass
  15.  
  16.  
  17. class Jenkins(object):
  18.  
  19. def __init__(self, url, username=None, password=None):
  20. if url[-1] == '/':
  21. self.server = url
  22. else:
  23. self.server = url + '/'
  24. if username is not None and password is not None:
  25. self.auth = 'Basic ' + \
  26. base64.encodestring('%s:%s' % (username, password))[:-1]
  27. else:
  28. self.auth = None
  29.  
  30. def create_job(self, name, config_xml):
  31. if self.job_exists(name):
  32. raise JenkinsException('%s already exists' % (name))
  33.  
  34. headers = {'Content-Type': 'text/xml'}
  35. self.send_request(CREATE_JOB % name, method='POST',
  36. data=config_xml, headers=headers)
  37. if not self.job_exists(name):
  38. raise JenkinsException('Create %s failed' % name)
  39. else:
  40. return self.get_job_info(name)
  41.  
  42. def get_job_info(self, name):
  43. try:
  44. resp = self.send_request(JOB_INFO % name)
  45. if resp:
  46. return resp.json()
  47. else:
  48. raise JenkinsException('%s does not exist' % name)
  49. except requests.exceptions, e:
  50. raise JenkinsException('Error while talking to Jenkins: %s' % e)
  51.  
  52. def get_job_name(self, name):
  53. resp = self.send_request(JOB_NAME % name)
  54. if resp:
  55. resp_json = resp.json()
  56. if resp_json['name'] != name:
  57. raise JenkinsException(
  58. 'Jenkins returned different job name')
  59. return resp_json['name']
  60. else:
  61. return None
  62.  
  63. def get_info(self):
  64. try:
  65. return (self.send_request(INFO)).json
  66. except requests.exceptions, e:
  67. raise JenkinsException('Error while talking to Jenkins: %s' % e)
  68.  
  69. def get_jobs(self):
  70. return self.get_info()['jobs']
  71.  
  72. def enable_job(self, name):
  73. self.get_job_info(name)
  74. self.send_request(ENABLE_JOB % name)
  75.  
  76. def disable_job(self, name):
  77. self.get_job_info(name)
  78. self.send_request(DISABLE_JOB % name)
  79.  
  80. def job_exists(self, name):
  81. if self.get_job_name(name) == name:
  82. return True
  83. else:
  84. return False
  85.  
  86. def get_job_config(self, name):
  87. resp = self.send_request(JOB_CONFIG % name)
  88. xml = resp.text
  89. return xml
  90.  
  91. def send_request(self, url, method='GET', data=None, headers=None):
  92. try:
  93. url = self.server + url
  94. resp = None
  95.  
  96. if not headers:
  97. headers = {}
  98. if self.auth:
  99. headers['Authorization'] = self.auth
  100.  
  101. if (method == 'GET'):
  102. resp = requests.get(url, params=data, headers=headers)
  103. elif(method == 'POST'):
  104. resp = requests.post(url, data=data, headers=headers)
  105.  
  106. return resp
  107. except requests.exceptions, e:
  108. raise JenkinsException('Error while talking to Jenkins: %s' % e)
Add Comment
Please, Sign In to add comment