Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """ ntpq out parser and reporter
- """
- import re
- import socket
- # pylint: disable=redefined-builtin
- from ansible.module_utils.basic import AnsibleModule
- # - name: Run the ntp health module against the out from the ntp status command
- # ntp_health:
- # output: "{{ output['stdout'][0] }}" # the output of 'show ntp associations or show ntp peer-status'
- # ntp_servers: "{{ ntp_servers }}" # a list of ntp servers
- # os: "{{ os }}" # netmiko style os (cisco_os, cisco_nxos, cisco_xe etc)
- # domain_name: "company.net" # the domain name to add to ntp servers where their domain gets cut off
- # register: health
- # ignore_errors: true
- #
- # - debug: var=health
- REGEXES = {
- "arista_eos": re.compile(r'''
- ^ # Beginning of line
- (?P<status>[\sx\.\-+#\*o]) # Capture the status
- (?P<remote>\S+)\s+ # The remote name
- (?P<refid>(\d{1,3}.){3}\d{1,3})\s+ # The remote's refid followed by spaces
- (?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
- (?P<type>[lumb-])\s+ # The type of remote, followed by spaces
- (?P<when>\d+)\s+ # The last time the server was queried
- (?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
- (?P<reach>\d+)\s+ # Reach, followed by spaces
- (?P<delay>[\d\.]*)\s+ # Delay, followed by spaces
- (?P<offset>[\d\.-]*)\s+ # Offset followed by spaces
- (?P<jitter>[\d\.]*) # Jitter
- $ ''', # End of line
- re.VERBOSE),
- "cisco_ios": re.compile(r'''
- ^ # Beginning of line
- (?P<status>[\sx\.\-+#\*o]) # Capture the status
- (?P<configured>\~) # Configured
- (?P<remote>(\d{1,3}.){3}\d{1,3})\s+ # The remote IP followed by spaces
- (?P<refid>(\d{1,3}.){3}\d{1,3})\s+ # The remote's refid followed by spaces
- (?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
- (?P<when>\d+)\s+ # The last time the server was queried
- (?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
- (?P<reach>\d+)\s+ # Reach, followed by spaces
- (?P<delay>[\d\.]*)\s+ # Delay, followed by spaces
- (?P<offset>[\d\.-]*)\s+ # Offset followed by spaces
- (?P<jitter>[\d\.]*) # Jitter
- $''', # End of line
- re.VERBOSE),
- "cisco_nxos": re.compile(r'''
- ^
- (?P<status>[\*\+=-]) # Beginning of line
- (?P<remote>(\d{1,3}.){3}\d{1,3})\s+ # The remote IP followed by spaces
- (?P<local>(\d{1,3}.){3}\d{1,3})\s+ # The local IP followed by spaces
- (?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
- (?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
- (?P<reach>\d+)\s+ # Delay, followed by spaces
- (?P<delay>[\d\.]*) # Reach
- (\s+(?P<vrf>\S+))? # spaces, VRF (optional)
- (\s+)? # stoopid eol spaces
- $''', # End of line
- re.VERBOSE),
- "cisco_xr": re.compile(r'''
- ^ # Beginning of line
- (?P<status>[\sx\.\-+#\*o]) # Capture the status
- (?P<configured>\~) # Configured
- (?P<remote>(\d{1,3}.){3}\d{1,3})\s+ # The remote IP followed by spaces
- (vrf\s(?P<vrf>\S+)\s+)? # Optional VRF
- (?P<refid>(\d{1,3}.){3}\d{1,3})\s+ # The remote's refid followed by spaces
- (?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
- (?P<when>\d+)\s+ # The last time the server was queried
- (?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
- (?P<reach>\d+)\s+ # Reach, followed by spaces
- (?P<delay>[\d\.]*)\s+ # Delay, followed by spaces
- (?P<offset>[\d\.-]*)\s+ # Offset followed by spaces
- (?P<jitter>[\d\.]*) # Jitter
- $''', # End of line
- re.VERBOSE),
- }
- def nxos_vdc(os, output):
- """ Determine if this is a VDC and set desired accordingly
- Args:
- output (str): The output from the ntp status command
- Returns:
- dict: Mock desired since we cna resolve
- """
- if os == "cisco_nxos":
- if "System clock is not controlled by NTP in this VDC" in output:
- return True
- else:
- return False
- def parse_output(params):
- """ Parses the output and returns structured data
- Args:
- output (str): The output from the ntp status command
- Returns:
- dict: A dictionary of entries
- """
- entries = []
- if params['os'] in REGEXES:
- for line in params['output'].splitlines():
- result = re.match(REGEXES[params['os']], line)
- if result:
- entry = result.groupdict()
- entries.append(entry)
- return None, entries
- else:
- return "No regex support for %s" % params['os'], None
- def arista_resolve(entries, domain_name):
- """ Resolves entries to an IP address
- Args:
- entries (dict): The dict of entries
- Returns:
- dict: A dictionary of entries
- """
- for entry in entries:
- try:
- entry['remote'] = socket.gethostbyname("%s.%s" % (entry['remote'].split('.')[0], domain_name))
- except socket.gaierror:
- pass
- return entries
- def xr_unwrap(output):
- """ Unwrap long XR entries
- Args:
- output (str): The str output
- Returns:
- str: Unwrapped output
- """
- lines = output.splitlines()
- i = len(lines) - 1
- newlist = []
- while i >= 0:
- hl_regex = re.compile(r'^\s+(\d{1,3}.){3}\d{1,3}')
- if hl_regex.match(lines[i]):
- newlist.insert(0, "%s %s" % (lines[i-1], lines[i]))
- i -= 2
- else:
- newlist.insert(0, lines[i])
- i -= 1
- lines = "\n".join(newlist)
- return lines
- def find_failed(entries):
- """ Walk the entries and look for stratum 16
- Args:
- entries (dict): A dictionary of NTP server entries
- Return:
- dict: The dict of entries with an additonal k,v for each
- """
- for entry in entries:
- entry['stratum_ok'] = bool(entry['stratum'] != "16")
- return entries
- def resolve_expected(entries):
- """ Resolve each in the list of entries to an IP address
- Args:
- entries (list): A list of NTP servers
- Returns:
- dict: A dict of name, ip
- """
- desired = {}
- for entry in entries:
- try:
- desired[entry] = socket.gethostbyname(entry)
- except socket.gaierror:
- desired[entry] = None
- except Exception as error: # pylint: disable=broad-except
- desired[entry] = str(error)
- return desired
- def roll_up(summary):
- """ Assess each of the individual health check and produce a final health check
- Args:
- summary (dict): A dictionary of summary information
- Returns:
- dict: The same dict, with a health note and health
- """
- summary['healthy_failed_reasons'] = []
- if summary['missing']:
- note = "One or more NTP servers is missing from the output."
- summary['healthy_failed_reasons'].append(note)
- if summary['extra']:
- note = "One or more extra NTP servers found in the output."
- summary['healthy_failed_reasons'].append(note)
- if not summary['stratums_ok']:
- note = "One or more NTP servers found with a stratum of 16."
- summary['healthy_failed_reasons'].append(note)
- if not summary['desired_ok']:
- note = "One or more of the desired NTP servers did not resolve to an IP address."
- summary['healthy_failed_reasons'].append(note)
- if not summary['desired_no_dupes']:
- note = "Duplicate entries found in desired NTP server list."
- summary['healthy_failed_reasons'].append(note)
- if not bool(summary['chosen']):
- note = "No 'chosen' NTP server found in output."
- summary['healthy_failed_reasons'].append(note)
- if not summary['reachability_ok']:
- note = "One or more NTP server is experiencing reachability issues."
- summary['healthy_failed_reasons'].append(note)
- summary['healthy'] = not bool(summary['healthy_failed_reasons'])
- return summary
- def summarize(entries, desired, output):
- """ Use both the output and parsed entries to produce summary information
- Args:
- output (str): The output from the ntpq -pn command
- entries (dict): A dictionary of NTP server entries
- desired (dict): The list of NTP servers, resolved to IPs
- Returns:
- summary (dict): The summary dictionary
- """
- summary = {}
- current_ntp_ip_list = set(x['remote'] for x in entries)
- desired_ntp_ip_list = set(desired.values())
- summary['chosen'] = next((x for x in entries if x['status'] == "*"), None)
- summary['current_ntp_ip_list'] = list(current_ntp_ip_list)
- summary['desired_no_dupes'] = len(desired_ntp_ip_list) == len(desired.values())
- summary['desired_ntp_ip_list'] = list(desired_ntp_ip_list)
- summary['desired_ok'] = all([bool(v) for v in desired.values()])
- summary['desired'] = desired
- summary['entries'] = entries
- summary['extra'] = list(current_ntp_ip_list - desired_ntp_ip_list)
- summary['missing'] = list(desired_ntp_ip_list - current_ntp_ip_list)
- summary['output'] = output.splitlines()
- summary['reachability_issues'] = [x for x in entries if x['reach'] != "377"]
- summary['reachability_ok'] = not bool(summary['reachability_issues'])
- summary['stratums_ok'] = all([x['stratum_ok'] for x in entries])
- summary = roll_up(summary)
- return summary
- def main():
- """ Genisis
- """
- module = AnsibleModule(
- argument_spec=dict(
- ntp_servers=dict(required=True, type='list'),
- output=dict(required=True, type='str'),
- os=dict(required=True, type='str'),
- domain_name=dict(required=False, type='str'),
- ),
- supports_check_mode=True)
- try:
- # Look for VDC warning in NX-OS
- is_vdc = nxos_vdc(module.params['os'], module.params['output'])
- if is_vdc:
- module.exit_json(changed=False, results="Is a NXOS VDC")
- else:
- desired = resolve_expected(module.params['ntp_servers'])
- # Reuse the IOS regex
- if module.params['os'] in ["cisco_xe", "cisco_asa", "cisco_fwsm", "cisco_pix"]:
- module.params['os'] = "cisco_ios"
- # Unwrap XR output
- if module.params['os'] == 'cisco_xr':
- module.params['output'] = xr_unwrap(module.params['output'])
- # Move along
- error, entries = parse_output(module.params)
- if error:
- module.fail_json(msg=error)
- # Resolve arista names back to IPs
- if module.params['os'] == "arista_eos":
- entries = arista_resolve(entries, module.params['domain_name'])
- entries = find_failed(entries)
- summary = summarize(entries, desired, module.params['output'])
- if summary['healthy']:
- module.exit_json(changed=False, results=summary)
- else:
- module.fail_json(msg=(", ".join(summary['healthy_failed_reasons'])), results=summary)
- except Exception as error: # pylint: disable=broad-except
- error_type = error.__class__.__name__
- module.fail_json(msg=error_type + ": " + str(error))
- if __name__ == "__main__":
- main()
Add Comment
Please, Sign In to add comment