Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- # Slack
- # -*- coding: utf-8 -*-
- # +------------------------------------------------------------------+
- # | ____ _ _ __ __ _ __ |
- # | / ___| |__ ___ ___| | __ | \/ | |/ / |
- # | | | | '_ \ / _ \/ __| |/ / | |\/| | ' / |
- # | | |___| | | | __/ (__| < | | | | . \ |
- # | \____|_| |_|\___|\___|_|\_\___|_| |_|_|\_\ |
- # | |
- # | Copyright Mathias Kettner 2018 mk@mathias-kettner.de |
- # +------------------------------------------------------------------+
- #
- # This file is part of Check_MK.
- # The official homepage is at http://mathias-kettner.de/check_mk.
- #
- # check_mk is free software; you can redistribute it and/or modify it
- # under the terms of the GNU General Public License as published by
- # the Free Software Foundation in version 2. check_mk is distributed
- # in the hope that it will be useful, but WITHOUT ANY WARRANTY; with-
- # out even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- # PARTICULAR PURPOSE. See the GNU General Public License for more de-
- # tails. You should have received a copy of the GNU General Public
- # License along with GNU Make; see the file COPYING. If not, write
- # to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
- # Boston, MA 02110-1301 USA.
- r"""
- Send notification messages to Slack
- ===================================
- Use a slack webhook to send notification messages
- """
- from __future__ import unicode_literals
- import sys
- import requests
- import re
- import os
- from typing import AnyStr, Dict, Optional, Tuple # pylint: disable=unused-import
- import cmk.password_store
- COLORS = {
- "CRITICAL": "#EE0000",
- "DOWN": "#EE0000",
- "WARNING": "#FFDD00",
- "OK": "#00CC00",
- "UP": "#00CC00",
- "UNKNOWN": "#CCCCCC",
- "UNREACHABLE": "#CCCCCC",
- }
- def cmk_links(context):
- # type: (Dict) -> Tuple[Optional[str], Optional[str]]
- if context.get("PARAMETER_URL_PREFIX"):
- url_prefix = context["PARAMETER_URL_PREFIX"]
- elif context.get("PARAMETER_URL_PREFIX_MANUAL"):
- url_prefix = context["PARAMETER_URL_PREFIX_MANUAL"]
- elif context.get("PARAMETER_URL_PREFIX_AUTOMATIC") == "http":
- url_prefix = "http://%s/%s" % (context["MONITORING_HOST"], context["OMD_SITE"])
- elif context.get("PARAMETER_URL_PREFIX_AUTOMATIC") == "https":
- url_prefix = "https://%s/%s" % (context["MONITORING_HOST"], context["OMD_SITE"])
- else:
- url_prefix = None
- if url_prefix:
- base_url = re.sub('/check_mk/?', '', url_prefix)
- host_url = base_url + context['HOSTURL']
- if context['WHAT'] == 'SERVICE':
- service_url = base_url + context['SERVICEURL']
- return host_url, service_url
- return host_url, None
- return None, None
- def extend_context_with_link_urls(context, link_template):
- # type: (Dict, AnyStr) -> None
- host_url, service_url = cmk_links(context)
- if host_url:
- context['LINKEDHOSTNAME'] = link_template.format(host_url, context['HOSTNAME'])
- else:
- context['LINKEDHOSTNAME'] = context['HOSTNAME']
- if service_url:
- context['LINKEDSERVICEDESC'] = link_template.format(service_url, context['SERVICEDESC'])
- else:
- context['LINKEDSERVICEDESC'] = context.get('SERVICEDESC', '')
- def slack_msg(context):
- # type: (Dict) -> Dict
- """Build the message for slack"""
- extend_context_with_link_urls(context, '<{}|{}>')
- if context.get('WHAT', None) == "SERVICE":
- color = COLORS.get(context["SERVICESTATE"])
- title = "Service {NOTIFICATIONTYPE} notification".format(**context)
- text = "Host: {LINKEDHOSTNAME} (IP: {HOSTADDRESS})\nService: {LINKEDSERVICEDESC}\nState: {SERVICESTATE}".format(
- **context)
- output = context["SERVICEOUTPUT"]
- else:
- color = COLORS.get(context["HOSTSTATE"])
- title = "Host {NOTIFICATIONTYPE} notification".format(**context)
- text = "Host: {LINKEDHOSTNAME} (IP: {HOSTADDRESS})\nState: {HOSTSTATE}".format(**context)
- output = context["HOSTOUTPUT"]
- return {
- "attachments": [
- {
- "color": color,
- "title": title,
- "text": text,
- },
- {
- "color":
- color,
- "title":
- "Additional Info",
- "text":
- output + "\nPlease take a look: " + ", ".join(
- map("@{}".format, context["CONTACTNAME"].split(','))),
- "footer":
- "Check_MK notification: {LONGDATETIME}".format(**context),
- },
- ]
- }
- def collect_context():
- # type: () -> Dict
- return {
- var[7:]: value.decode("utf-8")
- for (var, value) in os.environ.items()
- if var.startswith("NOTIFY_")
- }
- def retrieve_from_passwordstore(parameter):
- value = parameter.split()
- if len(value) == 2:
- if value[0] == 'store':
- value = cmk.password_store.load().get(value[1])
- else:
- value = value[1]
- else:
- value = value[0]
- return value
- def post_request(message_constructor, success_code=200):
- context = collect_context()
- url = retrieve_from_passwordstore(context.get("PARAMETER_WEBHOOK_URL"))
- r = requests.post(url=url, json=message_constructor(context))
- if r.status_code == success_code:
- sys.exit(0)
- else:
- sys.stderr.write(
- "Failed to send notification. Status: %i, Response: %s\n" % (r.status_code, r.text))
- sys.exit(2)
- if __name__ == "__main__":
- post_request(slack_msg)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement