Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- ##----------------------------------------------------------------------
- ## Huawei.VRP.get_interfaces
- ##----------------------------------------------------------------------
- ## Copyright (C) 2007-2012 The NOC Project
- ## See LICENSE for details
- ##----------------------------------------------------------------------
- """
- """
- # Python modules
- import re
- from collections import defaultdict
- # NOC modules
- from noc.lib.ip import IPv4
- from noc.sa.script import Script as NOCScript
- from noc.sa.interfaces import IGetInterfaces, InterfaceTypeError, MACAddressParameter
- class Script(NOCScript):
- name = "Huawei.VRP.get_interfaces"
- implements = [IGetInterfaces]
- TIMEOUT = 240
- cache = True
- types = {
- "Eth": "physical",
- "Trunk": "aggregated",
- "VLAN": "SVI",
- "Loopback": "loopback"
- }
- rx_ip_int_descr_2326 = re.compile(r".*(?P<name>Vlanif\d+)\s+(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\/(?P<mask>\d{2})\s+(?P<oper_status>U|D)\s+(?P<proto_status>U|D)\s+(?P<description>\S*)\n",
- re.MULTILINE | re.IGNORECASE | re.DOTALL)
- rx_if_descr_block = re.compile(
- r"Interface\s+Description$",
- re.MULTILINE | re.DOTALL | re.IGNORECASE)
- rx_if_descr = re.compile(
- r"^\s*(?P<interface>[^ ]+)\s+(?P<description>).*$",
- re.MULTILINE | re.DOTALL | re.IGNORECASE)
- rx_ifc_status = re.compile(
- r"^\s*(?P<interface>[^ ]+) current state :.*?(?P<status>up|down)",
- re.IGNORECASE)
- rx_ifc_block = re.compile(
- r"Interface\s+(PHY|Physical)\s+Protocol[^\n]+\n(?P<block>.*)$",
- re.MULTILINE | re.DOTALL | re.IGNORECASE)
- rx_ifc_br_status = re.compile(
- r"^\s*(?P<interface>[^ ]+)\s+(?P<status>up|down|\*down).*$",
- re.IGNORECASE)
- def execute(self):
- ifaces = {}
- current = None
- is_bundle = False
- is_svi = False
- vlan_ids = []
- mac_svi = ""
- ipv4_svi = ""
- name_ = {}
- mac_ = {}
- descr_ = {}
- status_ = {}
- operstatus_ = {}
- adminstatus_ = {}
- tagged_ = {}
- untagged_ = {}
- loopback_ = {}
- iftype_ = {}
- end_if = False
- ipv4_ = {}
- # Tested only on 2326 3328 5300 5328
- if (not (self.match_version(platform__contains="2326") or self.match_version(platform__contains="5328")
- or self.match_version(platform__contains="5300") or self.match_version(platform__contains="3328"))):
- raise self.NotSupportedError()
- # Get switch MAC, for Huawei it is the same for all Ethernet ports, see 'display interface' for example
- for p in self.scripts.get_arp():
- intf = p["interface"]
- if "Vlanif" in intf:
- mac_svi = p["mac"]
- ipv4_svi = p["ip"]
- # mac_[intf] = p["mac"]
- # ipv4_[intf] = p["ip"]
- # Get interface status, fill MAC and status lists
- for p in self.scripts.get_interface_status():
- intf = p["interface"]
- if "LoopBack" in intf:
- iftype_[intf] = "loopback"
- elif "Ethernet" in intf:
- iftype_[intf] = "physical"
- name_[intf] = intf
- elif "Trunk" in intf:
- iftype_[intf] = "aggregated"
- name_[intf] = intf
- elif "Vlanif" in intf:
- iftype_[intf] = "SVI"
- mac_[intf] = mac_svi
- operstatus_[intf] = p["status"]
- adminstatus_[intf] = p["adminstatus"]
- # Get switchports and fill tagged/untagged/description lists if they are not empty
- for p in self.scripts.get_switchport():
- intf = p["interface"]
- if "tagged" in p and p["tagged"]:
- tagged_[intf] = p["tagged"]
- if "untagged" in p and p["untagged"]:
- untagged_[intf] = p["untagged"]
- if "description" in p and p["description"]:
- descr_[intf] = p["description"]
- # try:
- # interface_descr = self.cli("display interface description", cached=True)
- # except self.CLISyntaxError:
- # raise self.NotSupportedError()
- # ip = match.group("ip")
- # mask = match.group("mask")
- # ip_addr += [IPv4(ip/mask).prefix]
- #
- # if (ls.strip().startswith("Vlanif")):
- # if not ip_addr:
- # continue
- # type = "SVI"
- # vlan_ids = [int(namesviif[7:])]
- # mac_svi = mac_[namesviif]
- # sub = {
- # "name": namesviif,
- # "admin_status": stat == "up",
- # "oper_status": stat == "up",
- # "is_ipv4": True,
- # "ipv4_addresses": ip_addr,
- # "vlan_ids": vlan_ids,
- # "mac": mac_svi,
- # }
- # ifaces[namesviif] = {
- # "name": namesviif,
- # "admin_status": stat == "up",
- # "oper_status": stat == "up",
- # "type": type,
- # "mac": mac_svi,
- # "subinterfaces": [sub],
- # }
- # Pre-process portchannel members
- portchannel_members = {}
- for pc in self.scripts.get_portchannel():
- i = pc["interface"]
- t = pc["type"] == "L"
- for m in pc["members"]:
- portchannel_members[m] = (i, t)
- # Simulate hard working
- # set name ifaces and skip SVI's
- for current in name_:
- is_svi = current.startswith("Vlanif")
- if is_svi:
- continue
- ifaces[current] = {
- "name": current
- }
- # other
- for current in ifaces:
- is_svi = current.startswith("Vlanif")
- if is_svi:
- continue
- is_bundle = current.startswith("Eth-Trunk")
- if is_bundle:
- ifaces[current]["mac"] = mac_[current]
- ifaces[current]["admin_status"] = adminstatus_[current]
- ifaces[current]["oper_status"] = operstatus_[current]
- ifaces[current]["type"] = "aggregated"
- # Sub-interface
- sub = {
- "name": current,
- "admin_status": adminstatus_[current],
- "oper_status": operstatus_[current],
- "is_bridge": True,
- "tagged_vlan": tagged_[current],
- "mac": mac_[current],
- }
- if current in untagged_:
- sub["untagged_vlans"] = untagged_[current]
- if current in descr_:
- ifaces[current]["description"] = descr_[current]
- sub["description"] = descr_[current]
- ifaces[current]["subinterfaces"] = [sub]
- else:
- ifaces[current]["mac"] = mac_[current]
- ifaces[current]["admin_status"] = adminstatus_[current]
- ifaces[current]["oper_status"] = operstatus_[current]
- ifaces[current]["type"] = iftype_[current]
- sub = {
- "name": current,
- "admin_status": adminstatus_[current],
- "oper_status": operstatus_[current],
- "is_bridge": True,
- "mac": mac_[current],
- }
- if current in descr_:
- ifaces[current]["description"] = descr_[current]
- sub["description"] = descr_[current]
- ifaces[current]["subinterfaces"] = [sub]
- if current in tagged_:
- sub["tagged_vlans"] = tagged_[current]
- if current in untagged_:
- sub["untagged_vlan"] = untagged_[current]
- # Portchannel member
- if current in portchannel_members:
- ai, is_lacp = portchannel_members[current]
- ifaces[current]["aggregated_interface"] = ai
- ifaces[current]["is_lacp"] = is_lacp
- # Get VRFs and "default" VRF interfaces
- r = []
- seen = set()
- vpns = [{
- "name": "default",
- "type": "ip",
- "interfaces": []
- }]
- for fi in vpns:
- # Forwarding instance
- rr = {
- "forwarding_instance": fi["name"],
- "type": fi["type"],
- "interfaces": []
- }
- rd = fi.get("rd")
- if rd:
- rr["rd"] = rd
- # create ifaces
- rr["interfaces"] = ifaces.values()
- r += [rr]
- # Return result
- return r
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement