Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## THINGS TO PERFORM PRIOR TO RUNNING THE CONFIG ##
- # INSTALL PYTHON
- # CREATE VIRTUAL ENVIRONMENT
- # sudo pip install virtualenv
- # virtualenv env
- # source env/bin/activate
- # pip install netmiko paramiko datetime requests getpass4
- # pip install --upgrade setuptools
- # sudo apt-get install build-essential libssl-dev libffi-dev -y
- # CREATE "validation.py" COPY ALL BELOW CODE AND PASTE IT.
- # CREATE "devices-file.txt"
- # PUT IN YOUR DEVICES IN THIS FORMAT BY IP "192.168.1.1" OR DNS NAME "DEVICE001"
- # EXAMPLE 1: abc.local.com,fortinet
- # EXAMPLE 2: 192.168.1.1,fortinet
- # EXAMPLE 3: abc.local.com,cisco_ios
- # EXAMPLE 4: 192.168.1.1,cisco_ios
- # EXAMPLE 5: abc.local.com,nxos
- # EXAMPLE 6: 192.168.1.1,nxos
- # CREATE "command_list.txt"
- # PUT IN THE COMMANDS LIST YOU WOULD WANT TO RUN
- # CISCO:
- # show run
- # show vrf
- # show port-channel summary
- # show etherchannel summary
- # FORTINET:
- # get interface status
- ##========================================================================##
- #! python
- # MUDULES NEEDS TO BE IMPORTED
- from pprint import pprint
- from multiprocessing.dummy import Pool as ThreadPool
- from netmiko import ConnectHandler
- from netmiko.exceptions import NetMikoTimeoutException, AuthenticationException, SSHException
- from datetime import datetime
- from time import time
- from getpass import getpass
- import logging
- import json
- import difflib
- import traceback
- # Configure logging
- logging.basicConfig(level=logging.DEBUG, # Set minimum logging level to DEBUG
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Customize log format
- datefmt='%Y-%m-%d %H:%M:%S', # Customize date format
- handlers=[ # Add handlers for both file and console output
- logging.FileHandler('network_operations.log'), # Log messages to a file
- logging.StreamHandler() # Also log messages to console
- ])
- # Example usage within your script
- logger = logging.getLogger(__name__)
- # At the beginning of your script or main function
- logger.info("Script started")
- # When catching exceptions, use different levels based on severity
- try:
- # Attempt to connect to a device
- pass
- except NetMikoTimeoutException as e:
- logger.warning(f"Timeout while connecting to device: {e}") # Use WARNING for recoverable errors
- except AuthenticationException as e:
- logger.error(f"Authentication failed: {e}") # Use ERROR for more severe issues
- except Exception as e:
- logger.critical(f"Unexpected error: {e}", exc_info=True) # Use CRITICAL for very severe issues and log stack trace
- #
- #
- # READ DEVICE LIST FROM A FILE
- def read_devices(devices_filename):
- devices = {} # CREATE DICTIONART FOR STORING DEVICES
- with open(devices_filename) as devices_file:
- for device_line in devices_file:
- device_info = device_line.strip().split(',') # EXTRACT DEVICE INFORMATION
- device = {
- 'ipaddr': device_info[0],
- 'device_type': device_info[1]
- }
- devices[device['ipaddr']] = device # STORE DEVICE IN THE DEVICES DICTIONARY
- # NOTE THE KEY FOR THE DEVICES DICTIONARY ENTRIES IS "ipaddr"
- print('\n************************** DEVICE LIST ***************************')
- for ipaddr, device in devices.items():
- print(f"IP Address: {ipaddr}, Device Type: {device['device_type']}")
- return devices
- #
- # CONFIGURATION WORKER FUNCTION (SHARED STRUCTURE FOR PRE AND POST VALIDATION)
- def config_worker(device, selection_prefix, username, password):
- max_retries = 3 # Maximum number of retry attempts
- retry_delay = 5 # Time to wait between retries, in seconds
- for attempt in range(max_retries):
- try:
- print(f"...{device['ipaddr']} Sending commands")
- # Use device_type from the device dictionary
- session = ConnectHandler(
- device_type=device['device_type'],
- ip=device['ipaddr'],
- username=username, # Use the passed username
- password=password # Use the passed password
- )
- # The rest of your code for sending commands...
- with open('command_list.txt') as f:
- commands = f.read().splitlines()
- for command in commands:
- command = command.strip()
- print(f"{device['ipaddr']} Sending: {command}")
- result = session.send_command(command, delay_factor=2)
- file_name = f"{device['ipaddr']}_{selection_prefix}.txt"
- with open(file_name, 'a+') as validation2:
- validation2.write(f'\n{command}\n{result}')
- # If commands are sent successfully, break out of the retry loop
- break
- except NetMikoTimeoutException as e:
- logging.error(f'TimeoutException for {device["ipaddr"]} on attempt {attempt+1} at {datetime.now()}: {str(e)}')
- if attempt < max_retries - 1: # Check if we are not on the last attempt
- print(f"Retrying in {retry_delay} seconds...")
- time.sleep(retry_delay)
- else:
- print(f"Failed to connect to {device['ipaddr']} after {max_retries} attempts.")
- except AuthenticationException as e:
- logging.error(f'AuthenticationException for {device["ipaddr"]} at {datetime.now()}: {str(e)}')
- # Authentication errors are not typically retried, so break from the loop
- break
- except SSHException as e:
- logging.error(f'SSHException for {device["ipaddr"]} at {datetime.now()}: {str(e)}')
- # Handle general SSH errors, may consider breaking or retrying based on the error
- break
- except Exception as e: # A generic catch-all for any other exceptions
- logging.error(f'Unhandled exception for {device["ipaddr"]} at {datetime.now()}: {str(e)}')
- break
- #
- # COMPARISON FUNCTION
- def compare_configs():
- try:
- with open("devices-file.txt") as f:
- devices = f.read().splitlines()
- for item in devices:
- # Split the item by commas and keep only the first and last parts for the filename
- parts = item.split(',')
- # Ensure there are enough parts to avoid IndexError
- if len(parts) >= 2:
- simplified_item = f"{parts[0].split('_')[0]}"
- else:
- print(f"Invalid format for device info: {item}")
- continue # Skip to the next item
- config_filename_pre = f"{simplified_item}_pre.txt"
- config_filename_post = f"{simplified_item}_post.txt"
- try:
- with open(config_filename_pre) as f:
- text1_lines = f.read().splitlines()
- with open(config_filename_post) as g:
- text2_lines = g.read().splitlines()
- except FileNotFoundError as e:
- print(f"Error opening file: {e}")
- continue # Skip this device and continue with the next
- difference = difflib.HtmlDiff().make_file(text1_lines, text2_lines, "PRE-VALIDATION", "POST-VALIDATION")
- diff_filename = f"{simplified_item}_difference_report.html"
- with open(diff_filename, 'w') as validation2:
- validation2.write(difference)
- print(f"Report generated for {simplified_item}: {diff_filename}")
- except Exception as e:
- print(f"An error occurred: {e}")
- #
- #
- # BELOW TEXT IS CREATED IN ASCII FORM
- # https://manytools.org/hacker-tools/ascii-banner/
- print ('\n')
- print("████████╗███████╗██╗ ██╗████████╗ ██╗ ██╗███████╗██████╗ ███████╗ ")
- print("╚══██╔══╝██╔════╝╚██╗██╔╝╚══██╔══╝ ██║ ██║██╔════╝██╔══██╗██╔════╝ ")
- print(" ██║ █████╗ ╚███╔╝ ██║ ███████║█████╗ ██████╔╝█████╗ ")
- print(" ██║ ██╔══╝ ██╔██╗ ██║ ██╔══██║██╔══╝ ██╔══██╗██╔══╝ ")
- print(" ██║ ███████╗██╔╝ ██╗ ██║ ██║ ██║███████╗██║ ██║███████╗ ")
- print(" ╚═╝ ╚══════╝╚═╝ ╚═╝ ╚═╝ ╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝╚══════╝ ")
- print ('\n')
- #
- #
- # Get username and password using getpass
- username = input("Enter username: ") # Username is typically not sensitive, so it's okay to use input()
- password = getpass("Enter password: ") # Use getpass for password to hide it while typing
- # USER SELECTION AND PROCESSING
- devices = read_devices('devices-file.txt')
- num_threads = 25 # Adjustable based on need
- valid_selections = ['1', '2', '3', '4']
- Selection = None
- while Selection not in valid_selections:
- print('\n** SELECT YOUR INPUT FROM BELOW **')
- print('\n 1: PRE-VALIDATION\n 2: POST-VALIDATION\n 3: COMPARE-PRE | POST\n 4: EXIT\n')
- Selection = input("Enter your choice (1, 2, 3, or 4): ")
- if Selection not in valid_selections:
- print("Error: Invalid selection. Please choose a valid option.")
- if Selection == "1":
- selection_prefix = 'pre'
- elif Selection == "2":
- selection_prefix = 'post'
- elif Selection == "3":
- compare_configs()
- elif Selection == "4":
- print("Exiting the script.")
- exit()
- if Selection in ['1', '2']:
- config_params_list = [(device, username, password) for ipaddr, device in devices.items()] # Include credentials in the parameters
- starting_time = time()
- print('\n--- Creating threadpool, launching get config threads\n')
- threads = ThreadPool(num_threads)
- results = threads.map(lambda params: config_worker(params[0], selection_prefix, params[1], params[2]), config_params_list)
- threads.close()
- threads.join()
- print('\n---- End get config threadpool, elapsed time=', time() - starting_time)
- elif Selection == "3":
- compare_configs( )
- else:
- print("Error: Invalid selection. Please choose a valid option.")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement