Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import xml.etree.ElementTree as ET
- import time
- from datetime import datetime
- import xml.dom.minidom
- # Configuration
- firewall_ip = "FIREWALLIP" # Replace with your firewall's IP address
- api_key = "API-KEY" # Replace with your generated API key
- threshold = 10 # Threshold in Mbps for high bandwidth consumption
- interval = 10 # Time interval in seconds to monitor
- session_duration_threshold = 600 # Exclude sessions running longer than this (in seconds)
- datetime_format = "%a %b %d %H:%M:%S %Y" # Date format provided by the firewall
- # Function to query the firewall for session information
- def get_session_data():
- url = f"https://{firewall_ip}/api/?type=op&cmd=<show><session><all></all></session></show>&key={api_key}"
- response = requests.get(url, verify=False)
- if response.status_code == 200:
- return response.content
- else:
- print(f"Error: Unable to connect to the firewall. Status Code: {response.status_code}")
- return None
- # Function to pretty print XML data
- def pretty_print_xml(xml_data):
- parsed_xml = xml.dom.minidom.parseString(xml_data)
- pretty_xml = parsed_xml.toprettyxml()
- print("\n--- Complete XML Response ---")
- print(pretty_xml)
- print("--- End of XML Response ---\n")
- # Function to parse XML response and identify high bandwidth consumers
- def parse_session_data(xml_data):
- root = ET.fromstring(xml_data)
- high_bandwidth_consumers = []
- for entry in root.findall(".//entry"):
- start_time_str = entry.find("start-time").text
- start_time = datetime.strptime(start_time_str, datetime_format)
- current_time = datetime.now()
- duration = (current_time - start_time).total_seconds()
- if duration > session_duration_threshold:
- continue # Skip long-running sessions
- source_ip = entry.find("source").text
- destination_ip = entry.find("dst").text
- username = entry.find("username").text if entry.find("username") is not None else "N/A"
- # Get the total bytes (since we only have total-byte-count)
- total_bytes_str = entry.find("total-byte-count").text if entry.find("total-byte-count") is not None else "0"
- # Convert to integer
- total_bytes = int(total_bytes_str)
- # Calculate current throughput in Mbps
- throughput_mbps = (total_bytes * 8) / (duration * 1000000)
- if throughput_mbps > threshold:
- high_bandwidth_consumers.append({
- "source_ip": source_ip,
- "destination_ip": destination_ip,
- "username": username,
- "throughput_mbps": throughput_mbps
- })
- return high_bandwidth_consumers
- # Main loop to monitor bandwidth
- def monitor_bandwidth():
- while True:
- xml_data = get_session_data()
- if xml_data:
- # Display the complete XML response for debugging
- pretty_print_xml(xml_data)
- high_bandwidth_consumers = parse_session_data(xml_data)
- if high_bandwidth_consumers:
- print("High Bandwidth Consumers:")
- for consumer in high_bandwidth_consumers:
- print(f"Source IP: {consumer['source_ip']}, Destination IP: {consumer['destination_ip']}, Username: {consumer['username']}, Throughput: {consumer['throughput_mbps']:.2f} Mbps")
- else:
- print("No high bandwidth consumers at this time.")
- time.sleep(interval)
- # Start monitoring
- if __name__ == "__main__":
- monitor_bandwidth()
Advertisement
Add Comment
Please, Sign In to add comment