Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import random
- import aiohttp
- import asyncio
- import getpass
- import codecs
- import requests
- from urlobject import URLObject
- from lxml import html
- from lxml.etree import XPathEvalError
- from yaml import load_all
- try:
- from yaml import CLoader as Loader, CDumper as Dumper
- except ImportError:
- from yaml import Loader, Dumper
- from urllib import parse as url_parse
- requests.packages.urllib3.disable_warnings()
- class HostProcessor(object):
- status_to_retry = [502, 504]
- max_retry_count = 20
- def __init__(self, name, urls, base_url='', port=None,
- verify=None, auth=None, base_host=None, scheme=None, cookies=None,
- verbose_success=True, verbose_fail=True, username=None, password=None, override_host=None, **kwargs):
- self.name = name
- self.base_host = base_host
- self.scheme = scheme
- self._base_url = URLObject(base_url) or URLObject().with_hostname(base_host)
- if self.scheme:
- self._base_url = self._base_url.with_scheme(scheme)
- self.port = port
- self.urls = urls
- self.verify = verify
- self.auth = auth
- self.cookies = cookies or {}
- self.save_cookie = False
- self._verbose_success = verbose_success
- self._verbose_fail = verbose_fail
- self.test_username = username
- self.test_password = password
- self.fail_count = 0
- if not (override_host is None):
- override_host = URLObject(override_host) or URLObject().with_hostname(override_host)
- self.override_host = override_host
- self.kwargs = {k: v % dict(base_host=base_host) for k, v in kwargs.items()}
- @staticmethod
- def parse_status(status):
- if isinstance(status, int):
- return status,
- if isinstance(status, str):
- return (int(s) for s in status.split(','))
- return status
- @property
- def base_url(self):
- base_url = URLObject(self._base_url % dict(base_host=self.base_host, **self.kwargs))
- if self.port:
- base_url = base_url.with_port(self.port)
- return base_url.with_scheme(self.scheme) if self.scheme else base_url
- def verbose_success(self, name, url, method):
- if not self._verbose_success:
- print('.', end='', flush=True)
- return
- print('\n{name} {url} {method} - OK'.format(name=name, url=url, method=method,))
- def verbose_fail(self, name, url, method, error_message):
- self.fail_count += 1
- if not self._verbose_fail:
- print('Err', end='', flush=True)
- return
- print('\n{name} {url} {method} - ERROR response: {error_message}'.format(
- name=name, url=url, method=method, error_message=error_message))
- async def process_parse(self, name, raw_html, xpath, ensure_count=False, **kwargs):
- tree = html.fromstring(raw_html)
- try:
- tasks = []
- url_list = tree.xpath(xpath)
- for url in url_list:
- url = URLObject(url)
- if url.hostname and url.hostname != self.base_url.hostname:
- continue
- tasks.append(self.process_url(' - '.join([name, url]), url, **kwargs))
- if ensure_count and len(url_list) != ensure_count:
- print('\n{name} - ERROR parse: got {got} expected {expected}'.format(
- name=name, got=len(url_list), expected=ensure_count))
- await asyncio.gather(*tasks)
- except XPathEvalError:
- print('xpath: {}'.format(xpath))
- raise
- async def process_form_data(self, name, url, raw_html, xpath, username=None, password=None,
- username_name='username', password_name='password', status=(302,)):
- status = self.parse_status(status)
- username, password = username or self.test_username, password or self.test_password
- tree = html.fromstring(raw_html)
- try:
- for form in tree.xpath(xpath):
- method = form.attrib.get('method', 'POST')
- action = URLObject(
- form.attrib.get('action', url)
- ).with_hostname(self.base_url.hostname).with_scheme(self.base_url.scheme)
- params = {}
- for input_elem in form.xpath('//input'):
- if 'name' not in input_elem.attrib:
- continue
- if input_elem.attrib['name'] == username_name:
- input_elem.value = username or input('Please enter username: ')
- elif input_elem.attrib['name'] == password_name:
- input_elem.value = password or getpass.getpass()
- params[input_elem.attrib['name']] = input_elem.value or ''
- response_status = await self.get_cookies(method, action, params)
- if response_status not in status:
- self.verbose_fail(name, action, method,
- 'status {} not in {}'.format(response_status, status))
- else:
- self.verbose_success(name, url, method)
- return response_status in status
- except XPathEvalError:
- print('xpath: {}'.format(xpath))
- raise
- async def get_cookies(self, method, url, params, counter=0):
- conn = aiohttp.TCPConnector(verify_ssl=False)
- async with aiohttp.ClientSession(connector=conn, cookies=self.cookies) as session:
- async with session.request(method, url, allow_redirects=False,
- params=params) as response:
- if (response.status in self.status_to_retry
- and counter < self.max_retry_count):
- await asyncio.sleep(1)
- return await self.get_cookies(method, url, counter+1)
- self.cookies.update(response.cookies)
- return response.status
- async def get_raw_html(self, method, url, counter=0):
- conn = aiohttp.TCPConnector(verify_ssl=False)
- async with aiohttp.ClientSession(connector=conn, cookies=self.cookies) as session:
- headers = {}
- if not (self.override_host is None):
- headers = {'Host': url.hostname}
- url = url\
- .with_hostname(self.override_host.hostname)\
- .with_port(self.override_host.port)\
- .with_scheme(self.override_host.scheme)
- async with session.request(method, url, allow_redirects=False, headers=headers) as response:
- if (response.status in self.status_to_retry
- and counter < self.max_retry_count):
- await asyncio.sleep(random.randint(1, 3))
- return await self.get_raw_html(method, url, counter+1)
- raw_html = await response.read()
- if self.save_cookie:
- self.cookies.update(response.cookies)
- return response.status, raw_html
- def format_url(self, url):
- if not (self.override_host is None):
- url = '{} (connected to {})'.format(url, self.override_host)
- return url
- async def process_url(self, name, url, status=(200,), method='get',
- parse=None, form_submit=None):
- status = self.parse_status(status)
- url = URLObject(url).with_hostname(self.base_url.hostname).with_scheme(self.base_url.scheme)
- if self.port:
- url = url.with_port(self.port)
- url = url.with_query('&'.join(k + ('=' + url_parse.quote_plus(v) if v else '') for k, v in url.query_dict.items()))
- response_status, raw_html = await self.get_raw_html(method, url)
- formatted_url = self.format_url(url)
- if response_status not in status:
- self.verbose_fail(
- name, formatted_url, method, 'status {} not in {}'.format(response_status, status))
- else:
- self.verbose_success(name, formatted_url, method)
- tasks = []
- if raw_html and parse:
- for parse_data in parse:
- parse_data['raw_html'] = raw_html
- parse_data['name'] = ' - '.join([name, parse_data['name']])
- tasks.append(self.process_parse(**parse_data))
- await asyncio.gather(*tasks)
- if form_submit:
- return await self.process_form_data(name, url, raw_html, **form_submit)
- async def process(self):
- if self.auth:
- self.save_cookie = True
- self.auth['name'] = ' - '.join([self.name, self.auth['name']])
- if not await self.process_url(**self.auth):
- return
- tasks = []
- for url_data in self.urls:
- if isinstance(url_data, str):
- url_data = {'url': url_data}
- if 'name' not in url_data:
- url_data['name'] = url_data['url']
- url_data['name'] = ' - '.join([self.name, url_data['name']])
- tasks.append(self.process_url(**url_data))
- await asyncio.gather(*tasks)
- async def main(conf_file, global_data):
- exit_code = 0
- with codecs.open(conf_file, 'r', 'utf-8') as conf_file:
- for data in load_all(conf_file, Loader=Loader):
- if isinstance(data, dict):
- global_data = dict(data, **global_data)
- elif isinstance(data, list):
- tasks = []
- processors = []
- for host_data in data:
- host_data.update(global_data)
- processor = HostProcessor(**host_data)
- processors.append(processor)
- tasks.append(processor.process())
- await asyncio.gather(*tasks)
- exit_code = 1 if any(p.fail_count > 0 for p in processors) else 0
- exit(exit_code)
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(description='Smoke test.')
- parser.add_argument('--test-suite', '-s', dest='conf_file', default='smoke-test.yaml',
- help='Test suite yaml file')
- parser.add_argument('--test-user', '-u', dest='test_username', help='Test user for auth')
- parser.add_argument('--test-password', '-p', dest='test_password',
- help='Test password for auth')
- parser.add_argument('--base-host', dest='base_host', help='Base host')
- parser.add_argument('--override-host', dest='override_host', help='Host to connect to instead of base host')
- parser.add_argument('--mobile-host', dest='mobile_host', help='Mobile host')
- parser.add_argument('--scheme', dest='scheme', help='Scheme')
- args = parser.parse_args()
- args_dict = vars(parser.parse_args())
- args_dict.pop('conf_file')
- global_data = {k: v for k, v in args_dict.items() if v is not None}
- loop = asyncio.get_event_loop()
- loop.run_until_complete(main(args.conf_file, global_data))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement