Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- ##----------------------------------------------------------------------
- ## Basic IOS XR parser
- ##----------------------------------------------------------------------
- ## Copyright (C) 2007-2015 The NOC Project
- ## See LICENSE for details
- ##----------------------------------------------------------------------
- ## Python modules
- import re
- ## Third-party modules
- from pyparsing import *
- ## NOC modules
- from noc.lib.ip import IPv4
- from noc.cm.parsers.pyparser import BasePyParser
- from noc.cm.parsers.tokens import INDENT, IPv4_ADDRESS, LINE, REST, DIGITS, ALPHANUMS
- from noc.lib.text import ranges_to_list
- class BaseIOSParser(BasePyParser):
- RX_INTERFACE_BLOCK = re.compile(
- r"^interface\s+(?P<name>\S+(?:\s+\d+\S*)?)\n"
- r"(?:\s+[^\n]*\n)*",
- re.MULTILINE
- )
- def __init__(self, managed_object):
- super(BaseIOSParser, self).__init__(managed_object)
- self.enable_cdp = True
- def create_parser(self):
- # System
- HOSTNAME = LineStart() + Literal("hostname") + REST.copy().setParseAction(self.on_hostname)
- DOMAIN_NAME = LineStart() + Literal("domain") + Literal("name") + REST.copy().setParseAction(self.on_domain_name)
- TIMEZONE = LineStart() + Literal("clock") + Literal("timezone") + REST.copy().setParseAction(self.on_timezone)
- NAMESERVER = LineStart() + Literal("domain") + Literal("name-server") + REST.copy().setParseAction(self.on_nameserver)
- # USER = LineStart() + Literal("username") + Word(alphanums + "-_").setParseAction(self.on_user) + REST
- CDP_RUN = LineStart() + (Literal("cdp")).setParseAction(self.on_cdp_run)
- # SSH_VERSION = LineStart() + Literal("ssh") + Literal("server") + (Word(nums) + restOfLine).setParseAction(self.on_ssh_version)
- SYSTEM_BLOCK = (
- HOSTNAME |
- DOMAIN_NAME |
- TIMEZONE |
- NAMESERVER |
- # USER |
- CDP_RUN
- # SERVICE |
- # SSH_VERSION
- )
- # Interface
- INTERFACE = LineStart() + Literal("interface") + REST.copy().setParseAction(self.on_interface)
- INTERFACE_DESCRIPTION = Literal("description") + REST.copy().setParseAction(self.on_interface_descripion)
- INTERFACE_ADDRESS = Literal("ipv4") + Literal("address") + (IPv4_ADDRESS("address") + IPv4_ADDRESS("mask") + Optional(Literal("secondary"))).setParseAction(self.on_interface_address)
- INTERFACE_SHUTDOWN = Literal("shutdown").setParseAction(self.on_interface_shutdown)
- INTERFACE_REDIRECTS = (Literal("ipv4") + Literal("redirects")).setParseAction(self.on_interface_redirects)
- INTERFACE_PROXY_ARP = Literal("proxy-arp").setParseAction(self.on_interface_proxy_arp)
- INTERFACE_CDP = Literal("cdp").setParseAction(self.on_interface_cdp)
- INTERFACE_BLOCK = INTERFACE + ZeroOrMore(INDENT + (
- INTERFACE_DESCRIPTION |
- INTERFACE_ADDRESS |
- INTERFACE_SHUTDOWN |
- INTERFACE_REDIRECTS |
- INTERFACE_PROXY_ARP |
- INTERFACE_CDP |
- LINE
- ))
- # Logging
- LOGGING_HOST = LineStart() + Literal("logging") + IPv4_ADDRESS.copy().setParseAction(self.on_logging_host)
- LOGGING_BLOCK = LOGGING_HOST
- # NTP
- # NTP_SERVER = LineStart() + Literal("ntp") + Literal("server") + IPv4_ADDRESS.copy().setParseAction(self.on_ntp_server)
- # NTP_BLOCK = NTP_SERVER
- CONFIG = (
- SYSTEM_BLOCK |
- INTERFACE_BLOCK |
- LOGGING_BLOCK
- # NTP_BLOCK
- )
- return CONFIG
- def get_interface_defaults(self, name):
- r = {
- "admin_status": True,
- "protocols": []
- }
- # @todo: Replace with more reliable type detection
- if self.enable_cdp and name[:2] in ("Fa", "Gi", "Te"):
- r["protocols"] += ["CDP"]
- return r
- def get_subinterface_defaults(self):
- return {
- "ip_redirects": False,
- "ip_proxy_arp": False,
- "admin_status": True
- }
- def get_user_defaults(self):
- return {
- "level": 0
- }
- def on_hostname(self, tokens):
- self.get_system_fact().hostname = tokens[0]
- def on_domain_name(self, tokens):
- self.get_system_fact().domain_name = tokens[0]
- def on_timezone(self, tokens):
- self.get_system_fact().timezone = tokens[0]
- def on_nameserver(self, tokens):
- self.get_system_fact().nameservers += [tokens[0]]
- def on_user(self, tokens):
- tokens = list(tokens)
- user = self.get_user_fact(tokens[0])
- def on_cdp_run(self, tokens):
- self.enable_cdp = tokens[0]
- def on_ssh_version(self, tokens):
- sf = self.get_service_fact("ssh")
- sf.enabled = True
- sf.version = tokens[0]
- def on_interface(self, tokens):
- name = tokens[0]
- if "." in name:
- i_name = name.split(".")[0]
- else:
- i_name = name
- self.get_subinterface_fact(name, i_name)
- def on_interface_descripion(self, tokens):
- si = self.get_current_subinterface()
- description = tokens[0]
- si.description = description
- if "." not in si.name:
- si.interface.description = description
- def on_interface_address(self, tokens):
- ip = str(IPv4(tokens[0], netmask=tokens[1]))
- si = self.get_current_subinterface()
- if len(tokens) > 2 and tokens[2] == "secondary":
- si.ipv4_addresses += [ip]
- else:
- si.ipv4_addresses = [ip] + si.ipv4_addresses
- si.add_afi("IPv4")
- def on_interface_shutdown(self, tokens):
- status = False
- si = self.get_current_subinterface()
- si.admin_status = status
- if "." not in si.name:
- si.interface.admin_status = status
- def on_interface_cdp(self, tokens):
- self.get_current_interface().add_protocol("CDP")
- def on_interface_redirects(self, tokens):
- self.get_current_subinterface().ip_redirects
- def on_interface_proxy_arp(self, tokens):
- self.get_current_subinterface().ip_proxy_arp = True
- def on_logging_host(self, tokens):
- self.get_sysloghost_fact(tokens[0])
- def on_ntp_server(self, tokens):
- self.get_ntpserver_fact(tokens[0])
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement