314ma

tplink

Jan 13th, 2021 (edited)
174
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #!/usr/bin/python3
  2. #
  3. # Retrieve port statistics from a TP-Link TL-SG1016DE switch
  4. #
  5. import requests
  6. import sys
  7. import re
  8. import json
  9.  
  10. from requests.exceptions import HTTPError
  11.  
  12. # Change to your needs
  13. ipaddress = '192.168.178.X'
  14. # Default values for the switch
  15. username = 'admin'
  16. password = 'admin'
  17.  
  18. class tplink_stats:
  19.     """Class for retrieving port statistics from a TP-Link switch"""
  20.  
  21.     # descriptive port labels; can be modified to your installation
  22.     port_label = [
  23.         'Port 0',
  24.         'Port 1',
  25.         'Port 2',
  26.         'Port 3',
  27.         'Port 4',
  28.         'Port 5',
  29.         'Port 6',
  30.         'Port 7',
  31.         'Port 8',
  32.         'Port 9',
  33.         'Port 10',
  34.         'Port 11',
  35.         'Port 12',
  36.         'Port 13',
  37.         'Port 14',
  38.         'Port 15'
  39.     ]
  40.     # Remaining parts do not need to be modified
  41.     port_state = [
  42.         'disabled',
  43.         'enabled'
  44.     ]
  45.     link_speed = [
  46.         '0',
  47.         '0',
  48.         '10',
  49.         '10',
  50.         '100',
  51.         '100',
  52.         '1000'
  53.     ]
  54.     duplex_info = [
  55.         'none',
  56.         'auto',
  57.         'half',
  58.         'full',
  59.         'half',
  60.         'full',
  61.         'full'
  62.     ]
  63.  
  64.     def __init__(self, ipaddress, username, password, debug):
  65.         self.ipaddress = ipaddress
  66.         self.referer = 'http://' + self.ipaddress + '/Logout.htm'
  67.         self.username = username
  68.         self.password = password
  69.         self.debug = debug
  70.  
  71.     def login(self):
  72.         url = 'http://' + self.ipaddress + '/logon.cgi'
  73.  
  74.         try:
  75.             if self.debug:
  76.                 print(f'POST {url}')
  77.             response = requests.post(url, headers={'Referer' : self.referer}, data={'username': self.username,'password': self.password, 'logon': 'Login'})
  78.  
  79.             # If the response was successful, no Exception will be raised
  80.             response.raise_for_status()
  81.         except HTTPError as http_err:
  82.             # For some weird reason the switch responds with 401 if authorisation suceeded
  83.             if (http_err.response.status_code != 401):
  84.                 if (self.debug):
  85.                     print(f'POST {url} failed with status {http_err.response.status_code}')
  86.                 return
  87.         except Exception as err:
  88.             if (self.debug):
  89.                 print(f'POST {url} exception {err}')
  90.             return
  91.  
  92.         self.referer = url
  93.         return url
  94.  
  95.     def port_stats(self):
  96.         url = 'http://' + self.ipaddress + '/PortStatisticsRpm.htm'
  97.         try:
  98.             if (self.debug):
  99.                 print(f'GET {url}')
  100.             response = requests.get(url, headers={'Referer' : self.referer})
  101.  
  102.             # If the response was successful, no Exception will be raised
  103.             response.raise_for_status()
  104.         except HTTPError as http_err:
  105.             if (self.debug):
  106.                 print(f'GET {url} failed with status {http_err.response.status_code}')
  107.             return
  108.         except Exception as err:
  109.             if (self.debug):
  110.                 print(f'GET {url} exception {err}')
  111.             return
  112.  
  113.         if (self.debug):
  114.             print(f'Body text: {response.text}')
  115.         self.referer = url
  116.         body = response.text.replace('\n','')
  117.         body.replace('\r','')
  118.  
  119.         m = re.search(r'var all_info = {(.*?)};', body)
  120.         if (m):
  121.             s = re.search(r'state:\[(.*?)\]', m.group(1))
  122.             state = s.group(1).split(',')
  123.             l = re.search(r'link_status:\[(.*?)\]', m.group(1))
  124.             link = l.group(1).split(',')
  125.             p = re.search(r'pkts:\[(.*?)\]', m.group(1))
  126.             pkts = p.group(1).split(',')
  127.             output = []
  128.             for i in range(0, 16):
  129.                 print(f'tplink_port_state,ipaddress="{self.ipaddress}",port={i} label="{self.port_label[i]}",state="{self.port_state[int(state[i])]}",link={self.link_speed[int(link[i])]},duplex="{self.duplex_info[int(link[i])]}",txgood={pkts[i * 4]},txbad={pkts[i * 4 + 1]},rxgood={pkts[i * 4 + 2]},rxbad={pkts[i * 4 + 3]}')
  130.                 output.append({
  131.                     "ipaddress":self.ipaddress,
  132.                     "port":i,
  133.                     "label":self.port_label[i],
  134.                     "state":self.port_state[int(state[i])],
  135.                     "link":self.link_speed[int(link[i])],
  136.                     "duplex":self.duplex_info[int(link[i])],
  137.                     "txgood":pkts[i * 4],
  138.                     "txbad":pkts[i * 4 + 1],
  139.                     "rxgood":pkts[i * 4 + 2],
  140.                     "rxbad":pkts[i * 4 + 3]
  141.                 })
  142.             print(json.dumps(output))
  143.         return url
  144.  
  145.     def logout(self):
  146.         url = 'http://' + self.ipaddress + '/Logout.htm'
  147.         try:
  148.             if (self.debug):
  149.                 print(f'GET {url}')
  150.             response = requests.get(url, headers={'Referer' : self.referer})
  151.  
  152.             # If the response was successful, no Exception will be raised
  153.             response.raise_for_status()
  154.         except HTTPError as http_err:
  155.             # 401 Unauthorized is the 'normal' response code here
  156.             if (http_err.response.status_code != 401):
  157.                 if (self.debug):
  158.                     print(f'GET {url} failed with status {http_err.response.status_code}')
  159.                 return
  160.         except Exception as err:
  161.             if (self.debug):
  162.                 print(f'GET {url} exception {err}')
  163.             return
  164.  
  165.         return url
  166.  
  167. t = tplink_stats(ipaddress, username, password, 0)
  168. login_url = t.login()
  169. if login_url:
  170.     t.port_stats()
  171.     t.logout()
RAW Paste Data