Advertisement
Guest User

Untitled

a guest
Mar 31st, 2016
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.78 KB | None | 0 0
  1. # stdlib
  2. from collections import defaultdict
  3. import re
  4. import time
  5.  
  6. # 3rd party
  7. import requests
  8.  
  9. # project
  10. from checks import AgentCheck
  11. from config import _is_affirmative
  12. from util import headers
  13.  
  14. STATS_URL = "/;csv;norefresh"
  15. EVENT_TYPE = SOURCE_TYPE_NAME = 'haproxy'
  16.  
  17.  
  18. class Services(object):
  19. BACKEND = 'BACKEND'
  20. FRONTEND = 'FRONTEND'
  21. ALL = (BACKEND, FRONTEND)
  22. ALL_STATUSES = (
  23. 'up', 'open', 'down', 'maint', 'nolb'
  24. )
  25.  
  26. STATUSES_TO_SERVICE_CHECK = {
  27. 'UP': AgentCheck.OK,
  28. 'DOWN': AgentCheck.CRITICAL,
  29. 'no check': AgentCheck.UNKNOWN,
  30. 'MAINT': AgentCheck.OK,
  31. }
  32.  
  33.  
  34. class HAProxy(AgentCheck):
  35. def __init__(self, name, init_config, agentConfig, instances=None):
  36. AgentCheck.__init__(self, name, init_config, agentConfig, instances)
  37.  
  38. # Host status needs to persist across all checks
  39. self.host_status = defaultdict(lambda: defaultdict(lambda: None))
  40.  
  41. METRICS = {
  42. "qcur": ("gauge", "queue.current"),
  43. "scur": ("gauge", "session.current"),
  44. "slim": ("gauge", "session.limit"),
  45. "spct": ("gauge", "session.pct"), # Calculated as: (scur/slim)*100
  46. "stot": ("rate", "session.rate"),
  47. "bin": ("rate", "bytes.in_rate"),
  48. "bout": ("rate", "bytes.out_rate"),
  49. "dreq": ("rate", "denied.req_rate"),
  50. "dresp": ("rate", "denied.resp_rate"),
  51. "ereq": ("rate", "errors.req_rate"),
  52. "econ": ("rate", "errors.con_rate"),
  53. "eresp": ("rate", "errors.resp_rate"),
  54. "wretr": ("rate", "warnings.retr_rate"),
  55. "wredis": ("rate", "warnings.redis_rate"),
  56. "req_rate": ("gauge", "requests.rate"), # HA Proxy 1.4 and higher
  57. "hrsp_1xx": ("rate", "response.1xx"), # HA Proxy 1.4 and higher
  58. "hrsp_2xx": ("rate", "response.2xx"), # HA Proxy 1.4 and higher
  59. "hrsp_3xx": ("rate", "response.3xx"), # HA Proxy 1.4 and higher
  60. "hrsp_4xx": ("rate", "response.4xx"), # HA Proxy 1.4 and higher
  61. "hrsp_5xx": ("rate", "response.5xx"), # HA Proxy 1.4 and higher
  62. "hrsp_other": ("rate", "response.other"), # HA Proxy 1.4 and higher
  63. "qtime": ("gauge", "queue.time"), # HA Proxy 1.5 and higher
  64. "ctime": ("gauge", "connect.time"), # HA Proxy 1.5 and higher
  65. "rtime": ("gauge", "response.time"), # HA Proxy 1.5 and higher
  66. "ttime": ("gauge", "session.time"), # HA Proxy 1.5 and higher
  67. }
  68.  
  69. SERVICE_CHECK_NAME = 'haproxy.backend_up'
  70.  
  71. def check(self, instance):
  72. url = instance.get('url')
  73. username = instance.get('username')
  74. password = instance.get('password')
  75. collect_aggregates_only = _is_affirmative(
  76. instance.get('collect_aggregates_only', True)
  77. )
  78. collect_status_metrics = _is_affirmative(
  79. instance.get('collect_status_metrics', False)
  80. )
  81.  
  82. collect_status_metrics_by_host = _is_affirmative(
  83. instance.get('collect_status_metrics_by_host', False)
  84. )
  85.  
  86. count_status_by_service = _is_affirmative(
  87. instance.get('count_status_by_service', True)
  88. )
  89.  
  90. tag_service_check_by_host = _is_affirmative(
  91. instance.get('tag_service_check_by_host', False)
  92. )
  93.  
  94. services_incl_filter = instance.get('services_include', [])
  95. services_excl_filter = instance.get('services_exclude', [])
  96.  
  97. self.log.debug('Processing HAProxy data for %s' % url)
  98.  
  99. data = self._fetch_data(url, username, password)
  100.  
  101. process_events = instance.get('status_check', self.init_config.get('status_check', False))
  102.  
  103. self._process_data(
  104. data, collect_aggregates_only, process_events,
  105. url=url, collect_status_metrics=collect_status_metrics,
  106. collect_status_metrics_by_host=collect_status_metrics_by_host,
  107. tag_service_check_by_host=tag_service_check_by_host,
  108. services_incl_filter=services_incl_filter,
  109. services_excl_filter=services_excl_filter,
  110. count_status_by_service=count_status_by_service,
  111. )
  112.  
  113. def _fetch_data(self, url, username, password):
  114. ''' Hit a given URL and return the parsed json '''
  115. # Try to fetch data from the stats URL
  116.  
  117. auth = (username, password)
  118. url = "%s%s" % (url, STATS_URL)
  119.  
  120. self.log.debug("HAProxy Fetching haproxy search data from: %s" % url)
  121.  
  122. r = requests.get(url, auth=auth, headers=headers(self.agentConfig), verify=False)
  123. r.raise_for_status()
  124.  
  125. return r.content.splitlines()
  126.  
  127. def _process_data(self, data, collect_aggregates_only, process_events, url=None,
  128. collect_status_metrics=False, collect_status_metrics_by_host=False,
  129. tag_service_check_by_host=False, services_incl_filter=None,
  130. services_excl_filter=None, count_status_by_service=True):
  131. ''' Main data-processing loop. For each piece of useful data, we'll
  132. either save a metric, save an event or both. '''
  133.  
  134. # Split the first line into an index of fields
  135. # The line looks like:
  136. # "# pxname,svname,qcur,qmax,scur,smax,slim,stot,bin,bout,dreq,dresp,ereq,econ,eresp,wretr,wredis,status,weight,act,bck,chkfail,chkdown,lastchg,downtime,qlimit,pid,iid,sid,throttle,lbtot,tracked,type,rate,rate_lim,rate_max,"
  137. fields = [f.strip() for f in data[0][2:].split(',') if f]
  138.  
  139. self.hosts_statuses = defaultdict(int)
  140.  
  141. back_or_front = None
  142.  
  143. # Skip the first line, go backwards to set back_or_front
  144. for line in data[:0:-1]:
  145. if not line.strip():
  146. continue
  147.  
  148. # Store each line's values in a dictionary
  149. data_dict = self._line_to_dict(fields, line)
  150.  
  151. if self._is_aggregate(data_dict):
  152. back_or_front = data_dict['svname']
  153.  
  154. self._update_data_dict(data_dict, back_or_front)
  155.  
  156. self._update_hosts_statuses_if_needed(
  157. collect_status_metrics, collect_status_metrics_by_host,
  158. data_dict, self.hosts_statuses
  159. )
  160.  
  161. if self._should_process(data_dict, collect_aggregates_only):
  162. # update status
  163. # Send the list of data to the metric and event callbacks
  164. self._process_metrics(
  165. data_dict, url,
  166. services_incl_filter=services_incl_filter,
  167. services_excl_filter=services_excl_filter
  168. )
  169. if process_events:
  170. self._process_event(
  171. data_dict, url,
  172. services_incl_filter=services_incl_filter,
  173. services_excl_filter=services_excl_filter
  174. )
  175. self._process_service_check(
  176. data_dict, url,
  177. tag_by_host=tag_service_check_by_host,
  178. services_incl_filter=services_incl_filter,
  179. services_excl_filter=services_excl_filter
  180. )
  181.  
  182. if collect_status_metrics:
  183. self._process_status_metric(
  184. self.hosts_statuses, collect_status_metrics_by_host,
  185. services_incl_filter=services_incl_filter,
  186. services_excl_filter=services_excl_filter,
  187. count_status_by_service=count_status_by_service
  188. )
  189.  
  190. self._process_backend_hosts_metric(
  191. self.hosts_statuses,
  192. services_incl_filter=services_incl_filter,
  193. services_excl_filter=services_excl_filter
  194. )
  195.  
  196. return data
  197.  
  198. def _line_to_dict(self, fields, line):
  199. data_dict = {}
  200. for i, val in enumerate(line.split(',')[:]):
  201. if val:
  202. try:
  203. # Try converting to a long, if failure, just leave it
  204. val = float(val)
  205. except Exception:
  206. pass
  207. data_dict[fields[i]] = val
  208. return data_dict
  209.  
  210. def _update_data_dict(self, data_dict, back_or_front):
  211. """
  212. Adds spct if relevant, adds service
  213. """
  214. data_dict['back_or_front'] = back_or_front
  215. # The percentage of used sessions based on 'scur' and 'slim'
  216. if 'slim' in data_dict and 'scur' in data_dict:
  217. try:
  218. data_dict['spct'] = (data_dict['scur'] / data_dict['slim']) * 100
  219. except (TypeError, ZeroDivisionError):
  220. pass
  221.  
  222. def _is_aggregate(self, data_dict):
  223. return data_dict['svname'] in Services.ALL
  224.  
  225. def _update_hosts_statuses_if_needed(self, collect_status_metrics,
  226. collect_status_metrics_by_host,
  227. data_dict, hosts_statuses):
  228. if data_dict['svname'] == Services.BACKEND:
  229. return
  230. if collect_status_metrics and 'status' in data_dict and 'pxname' in data_dict:
  231. if collect_status_metrics_by_host and 'svname' in data_dict:
  232. key = (data_dict['pxname'], data_dict['svname'], data_dict['status'])
  233. else:
  234. key = (data_dict['pxname'], data_dict['status'])
  235. hosts_statuses[key] += 1
  236.  
  237. def _should_process(self, data_dict, collect_aggregates_only):
  238. """
  239. if collect_aggregates_only, we process only the aggregates
  240. else we process all except Services.BACKEND
  241. """
  242. if collect_aggregates_only:
  243. if self._is_aggregate(data_dict):
  244. return True
  245. return False
  246. elif data_dict['svname'] == Services.BACKEND:
  247. return False
  248. return True
  249.  
  250. def _is_service_excl_filtered(self, service_name, services_incl_filter,
  251. services_excl_filter):
  252. if self._tag_match_patterns(service_name, services_excl_filter):
  253. if self._tag_match_patterns(service_name, services_incl_filter):
  254. return False
  255. return True
  256. return False
  257.  
  258. def _tag_match_patterns(self, tag, filters):
  259. if not filters:
  260. return False
  261. for rule in filters:
  262. if re.search(rule, tag):
  263. return True
  264. return False
  265.  
  266. def _process_backend_hosts_metric(self, hosts_statuses, services_incl_filter=None,
  267. services_excl_filter=None):
  268. agg_statuses = defaultdict(lambda: {'available': 0, 'unavailable': 0})
  269. for host_status, count in hosts_statuses.iteritems():
  270. try:
  271. service, hostname, status = host_status
  272. except Exception:
  273. service, status = host_status
  274.  
  275. if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
  276. continue
  277. status = status.lower()
  278. if 'up' in status:
  279. agg_statuses[service]['available'] += count
  280. elif 'down' in status or 'maint' in status or 'nolb' in status:
  281. agg_statuses[service]['unavailable'] += count
  282. else:
  283. # create the entries for this service anyway
  284. agg_statuses[service]
  285.  
  286. for service in agg_statuses:
  287. tags = ['service:%s' % service]
  288. self.gauge(
  289. 'haproxy.backend_hosts',
  290. agg_statuses[service]['available'],
  291. tags=tags + ['available:true'])
  292. self.gauge(
  293. 'haproxy.backend_hosts',
  294. agg_statuses[service]['unavailable'],
  295. tags=tags + ['available:false'])
  296. return agg_statuses
  297.  
  298. def _process_status_metric(self, hosts_statuses, collect_status_metrics_by_host,
  299. services_incl_filter=None, services_excl_filter=None,
  300. count_status_by_service=True):
  301. agg_statuses = defaultdict(lambda: {'available': 0, 'unavailable': 0})
  302.  
  303. # use a counter unless we have a unique tag set to gauge
  304. counter = defaultdict(int)
  305. if count_status_by_service and collect_status_metrics_by_host:
  306. # `service` and `backend` tags will exist
  307. counter = None
  308.  
  309. for host_status, count in hosts_statuses.iteritems():
  310. try:
  311. service, hostname, status = host_status
  312. except Exception:
  313. service, status = host_status
  314. status = status.lower()
  315.  
  316. tags = []
  317. if count_status_by_service:
  318. tags.append('service:%s' % service)
  319.  
  320. if self._is_service_excl_filtered(service, services_incl_filter, services_excl_filter):
  321. continue
  322.  
  323. if collect_status_metrics_by_host:
  324. tags.append('backend:%s' % hostname)
  325.  
  326. self._gauge_all_statuses(
  327. "haproxy.count_per_status",
  328. count, status, tags, counter
  329. )
  330.  
  331. if 'up' in status or 'open' in status:
  332. agg_statuses[service]['available'] += count
  333. if 'down' in status or 'maint' in status or 'nolb' in status:
  334. agg_statuses[service]['unavailable'] += count
  335.  
  336. if counter is not None:
  337. # send aggregated counts as gauges
  338. for key, count in counter.iteritems():
  339. metric_name, tags = key[0], key[1]
  340. self.gauge(metric_name, count, tags=tags)
  341.  
  342. for service in agg_statuses:
  343. for status, count in agg_statuses[service].iteritems():
  344. tags = ['status:%s' % status]
  345. if count_status_by_service:
  346. tags.append('service:%s' % service)
  347.  
  348. self.gauge("haproxy.count_per_status", count, tags=tags)
  349.  
  350. def _gauge_all_statuses(self, metric_name, count, status, tags, counter):
  351. if counter is not None:
  352. counter_key = tuple([metric_name, tuple(tags + ['status:%s' % status])])
  353. counter[counter_key] += count
  354. else:
  355. # assume we have enough context, just send a gauge
  356. self.gauge(metric_name, count, tags + ['status:%s' % status])
  357.  
  358. for state in Services.ALL_STATUSES:
  359. if state != status:
  360. self.gauge(metric_name, 0, tags + ['status:%s' % state.replace(" ", "_")])
  361.  
  362. def _process_metrics(self, data, url, services_incl_filter=None,
  363. services_excl_filter=None):
  364. """
  365. Data is a dictionary related to one host
  366. (one line) extracted from the csv.
  367. It should look like:
  368. {'pxname':'dogweb', 'svname':'i-4562165', 'scur':'42', ...}
  369. """
  370. hostname = data['svname']
  371. service_name = data['pxname']
  372. back_or_front = data['back_or_front']
  373. tags = ["type:%s" % back_or_front, "instance_url:%s" % url]
  374. tags.append("service:%s" % service_name)
  375.  
  376. if self._is_service_excl_filtered(service_name, services_incl_filter,
  377. services_excl_filter):
  378. return
  379.  
  380. if back_or_front == Services.BACKEND:
  381. tags.append('backend:%s' % hostname)
  382.  
  383. for key, value in data.items():
  384. if HAProxy.METRICS.get(key):
  385. suffix = HAProxy.METRICS[key][1]
  386. name = "haproxy.%s.%s" % (back_or_front.lower(), suffix)
  387. if HAProxy.METRICS[key][0] == 'rate':
  388. self.rate(name, value, tags=tags)
  389. else:
  390. self.gauge(name, value, tags=tags)
  391.  
  392. def _process_event(self, data, url, services_incl_filter=None,
  393. services_excl_filter=None):
  394. '''
  395. Main event processing loop. An event will be created for a service
  396. status change.
  397. Service checks on the server side can be used to provide the same functionality
  398. '''
  399. hostname = data['svname']
  400. service_name = data['pxname']
  401. key = "%s:%s" % (hostname, service_name)
  402. status = self.host_status[url][key]
  403.  
  404. if self._is_service_excl_filtered(service_name, services_incl_filter,
  405. services_excl_filter):
  406. return
  407.  
  408. if status is None:
  409. self.host_status[url][key] = data['status']
  410. return
  411.  
  412. if status != data['status'] and data['status'] in ('UP', 'DOWN'):
  413. # If the status of a host has changed, we trigger an event
  414. try:
  415. lastchg = int(data['lastchg'])
  416. except Exception:
  417. lastchg = 0
  418.  
  419. # Create the event object
  420. ev = self._create_event(
  421. data['status'], hostname, lastchg, service_name,
  422. data['back_or_front']
  423. )
  424. self.event(ev)
  425.  
  426. # Store this host status so we can check against it later
  427. self.host_status[url][key] = data['status']
  428.  
  429. def _create_event(self, status, hostname, lastchg, service_name, back_or_front):
  430. HAProxy_agent = self.hostname.decode('utf-8')
  431. if status == "DOWN":
  432. alert_type = "error"
  433. title = "%s reported %s:%s %s" % (HAProxy_agent, service_name, hostname, status)
  434. else:
  435. if status == "UP":
  436. alert_type = "success"
  437. else:
  438. alert_type = "info"
  439. title = "%s reported %s:%s back and %s" % (HAProxy_agent, service_name, hostname, status)
  440.  
  441. tags = ["service:%s" % service_name]
  442. if back_or_front == Services.BACKEND:
  443. tags.append('backend:%s' % hostname)
  444. return {
  445. 'timestamp': int(time.time() - lastchg),
  446. 'event_type': EVENT_TYPE,
  447. 'host': HAProxy_agent,
  448. 'msg_title': title,
  449. 'alert_type': alert_type,
  450. "source_type_name": SOURCE_TYPE_NAME,
  451. "event_object": hostname,
  452. "tags": tags
  453. }
  454.  
  455. def _process_service_check(self, data, url, tag_by_host=False,
  456. services_incl_filter=None, services_excl_filter=None):
  457. ''' Report a service check, tagged by the service and the backend.
  458. Statuses are defined in `STATUSES_TO_SERVICE_CHECK` mapping.
  459. '''
  460. service_name = data['pxname']
  461. status = data['status']
  462. haproxy_hostname = self.hostname.decode('utf-8')
  463. check_hostname = haproxy_hostname if tag_by_host else ''
  464.  
  465. if self._is_service_excl_filtered(service_name, services_incl_filter,
  466. services_excl_filter):
  467. return
  468.  
  469. if status in Services.STATUSES_TO_SERVICE_CHECK:
  470. service_check_tags = ["service:%s" % service_name]
  471. hostname = data['svname']
  472. if data['back_or_front'] == Services.BACKEND:
  473. service_check_tags.append('backend:%s' % hostname)
  474.  
  475. status = Services.STATUSES_TO_SERVICE_CHECK[status]
  476. message = "%s reported %s:%s %s" % (haproxy_hostname, service_name,
  477. hostname, status)
  478. self.service_check(self.SERVICE_CHECK_NAME, status, message=message,
  479. hostname=check_hostname, tags=service_check_tags)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement