Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- # ---------------------------------------------------------------------
- # Nateks.NetXpert.get_lldp_neighbors
- # ---------------------------------------------------------------------
- # Copyright (C) 2007-2017 The NOC Project
- # See LICENSE for details
- # ---------------------------------------------------------------------
- # Python modules
- import re
- # NOC modules
- from noc.core.script.base import BaseScript
- from noc.sa.interfaces.igetlldpneighbors import IGetLLDPNeighbors
- from noc.lib.validators import is_int, is_ipv4, is_ipv6, is_mac
- from noc.sa.interfaces.base import MACAddressParameter, IPv4Parameter
- class Script(BaseScript):
- name = "Nateks.NetXpert.get_lldp_neighbors"
- interface = IGetLLDPNeighbors
- rx_summary_split = re.compile(r"^Device-ID.+?\n",
- re.MULTILINE | re.IGNORECASE)
- rx_s_line = re.compile(
- r"^\S+\s*(?P<local_if>(?:Fas|Gig|Te)\d+[\d\/\.]*)")
- rx_chassis_id = re.compile(
- r"^chassis id:\s*(?P<id>\S+)", re.MULTILINE | re.IGNORECASE)
- rx_remote_port = re.compile(
- "^port id:\s*(?P<remote_if>.+?)\s*$", re.MULTILINE | re.IGNORECASE)
- rx_enabled_caps = re.compile(
- "^enabled capabilities:\s*(?P<caps>\S*)",
- re.MULTILINE | re.IGNORECASE)
- rx_system = re.compile(
- r"^system name:\s*(?P<name>\S+)", re.MULTILINE | re.IGNORECASE)
- rx_descr = re.compile(r"^port description:\s*(?P<descr>.+)$", re.MULTILINE)
- def execute(self):
- r = []
- try:
- v = self.cli("show lldp neighbors")
- except self.CLISyntaxError:
- raise self.NotSupportedError()
- if v.startswith("%"):
- return []
- v = self.rx_summary_split.split(v)[1]
- lldp_interfaces = []
- for line in v.splitlines():
- line = line.strip()
- if not line:
- break
- match = self.rx_s_line.match(line)
- if not match:
- continue
- lldp_interfaces += [match.group("local_if")]
- for local_if in lldp_interfaces:
- i = {
- "local_interface": local_if,
- "neighbors": []
- }
- try:
- v = self.cli("sh lldp neighbors interface %s" % local_if)
- except self.CLISyntaxError:
- raise self.NotSupportedError()
- match = self.re_search(self.rx_remote_port, v)
- remote_port = match.group("remote_if")
- remote_port_subtype = 1
- if is_ipv4(remote_port):
- remote_port = IPv4Parameter().clean(remote_port)
- remote_port_subtype = 4
- elif is_mac(remote_port):
- remote_port = MACAddressParameter().clean(remote_port)
- remote_port_subtype = 3
- elif is_int(remote_port):
- remote_port_subtype = 7
- n = {
- "remote_port": remote_port,
- "remote_port_subtype": remote_port_subtype,
- "remote_chassis_id_subtype": 4
- }
- match = self.rx_descr.search(v)
- if match:
- n["remote_port_description"] = match.group("descr")
- match = self.rx_chassis_id.search(v)
- if not match:
- continue
- n["remote_chassis_id"] = match.group("id")
- cap = 0
- match = self.rx_enabled_caps.search(v)
- if match:
- for c in match.group("caps").split(","):
- c = c.strip()
- if c:
- cap |= {
- "O": 1, "P": 2, "B": 4,
- "W": 8, "R": 16, "T": 32,
- "C": 64, "S": 128
- }[c]
- n["remote_capabilities"] = cap
- match = self.rx_system.search(v)
- if match:
- n["remote_system_name"] = match.group("name")
- if is_ipv4(n["remote_chassis_id"]) \
- or is_ipv6(n["remote_chassis_id"]):
- n["remote_chassis_id_subtype"] = 5
- elif is_mac(n["remote_chassis_id"]):
- pass
- else:
- n["remote_chassis_id_subtype"] = 7
- i["neighbors"] += [n]
- r += [i]
- return r
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement