Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python
- import requests
- import time
- import pymysql
- import sys
- import logging
- import pandas as pd
- import urllib3
- import socket
- import requests
- from urllib.parse import urlparse
- urllib3.disable_warnings()
- API = 'https://api.ssllabs.com/api/v2/'
- def check_version(url):
- response = requests.get(url, timeout=10, verify=False)
- http2 = response.headers.get("upgrade")
- if http2 == None:
- if response.raw.version == 10:
- return "1.0"
- else:
- return "1.1"
- else:
- return "2"
- def check_http(results, host):
- try:
- endpoints = results["endpoints"]
- df_endpoints = pd.DataFrame(endpoints)
- protocols = df_endpoints.details[0]['npnProtocols']
- if "h2" in protocols:
- return "2"
- elif 'h2c' in protocols:
- return "2"
- elif "1.1" in protocols:
- return "1.1"
- else:
- return "1.0"
- except Exception as exc:
- return check_version(host)
- def requestAPI(path, payload={}):
- '''
- This is a helper method that takes the path to the relevant
- API call and the user-defined payload and requests the
- data/server test from Qualys SSL Labs.
- Returns JSON formatted data
- '''
- url = API + path
- try:
- response = requests.get(url, params=payload)
- except requests.exception.RequestException:
- logging.exception('Request failed.')
- sys.exit(1)
- data = response.json()
- return data
- def resultsFromCache(host, publish='off', startNew='off', fromCache='on', all='done'):
- path = 'analyze'
- payload = {
- 'host': host,
- 'publish': publish,
- 'startNew': startNew,
- 'fromCache': fromCache,
- 'all': all
- }
- data = requestAPI(path, payload)
- return data
- def newScan(host, publish='off', startNew='on', all='done', ignoreMismatch='on'):
- path = 'analyze'
- payload = {
- 'host': host,
- 'publish': publish,
- 'startNew': startNew,
- 'all': all,
- 'ignoreMismatch': ignoreMismatch
- }
- results = requestAPI(path, payload)
- payload.pop('startNew')
- while results['status'] != 'READY' and results['status'] != 'ERROR':
- time.sleep(30)
- results = requestAPI(path, payload)
- return results
- def response_time(url):
- import requests
- return requests.get(url).elapsed.total_seconds()
- def check_ipv6(url):
- # Create variable to store ipv4 and ipv6 array
- ip = []
- # parse URL to only get network location address
- parsed_url = urlparse(url).netloc
- # Get host by name using socket lib
- response = socket.gethostbyname(parsed_url)
- ip.append(response)
- try:
- # Try to get the ipv6 info
- ipv6_info = socket.getaddrinfo(parsed_url, None, family = socket.AF_INET6)
- # Get the address from the 3D array returned by getaddrinfo
- ip.append(ipv6_info[-1][-1][0])
- except Exception as exception:
- ip.append("none")
- return ip
- def check_uptime(url):
- try:
- # Use requests library to get headers of request
- response = requests.get(url)
- # and then check the response...
- if response.status_code == 200:
- return True
- else:
- return False
- except requests.RequestException as exception:
- return 0
- def check_site(url):
- results = newScan(url)
- http = check_http(results, url)
- up = check_uptime(url)
- response = response_time(url)
- ips = check_ipv6(url)
- ''' Connect to the database '''
- connection = pymysql.connect(host='localhost',
- user='root',
- password='Azerty123',
- db='websites',
- charset='utf8mb4',
- cursorclass=pymysql.cursors.DictCursor)
- try:
- with connection.cursor() as cursor:
- sql = "INSERT INTO statistics (url, http_version, response_time, ipv4, ipv6, up) VALUES (%s,%s,%s,%s,%s,%s)"
- cursor.execute(sql, (url, http , response, ips[0], ips[1], up))
- connection.commit()
- finally:
- connection.close()
- file = "websites.csv"
- websites = pd.read_csv(file)
- for site in websites:
- check_site(site)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement