Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- import os
- import sys
- import json
- import time
- import errno
- import base64
- import pychrome
- import threading
- import logging
- import psutil as psutil
- from bs4 import BeautifulSoup
- from argparse import ArgumentParser
- from subprocess import Popen, PIPE
- # Cross-compatible import for urlparse
- if sys.version_info >= (3, 0):
- from urllib.parse import urlparse
- if (3, 0) > sys.version_info >= (2, 5):
- from urlparse import urlparse
- # Log
- logging.basicConfig(filename='app.log',
- level=logging.DEBUG,
- format='(%(threadName)-9s) %(message)s', )
- # Define CLI Arguments
- parser = ArgumentParser()
- parser.add_argument('--urls', help='e.g. --urls=https://google.com,https://facebook.com,https://ebay.com',
- type=lambda s: [str(item) for item in s.split(',')])
- parser.add_argument('--window-size', help='e.g. --window-size=1024,768',
- type=lambda s: [int(item) for item in s.split(',')], default='1024,768')
- parser.add_argument('--user-agent',
- help='e.g. --user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.90 Safari/537.36"',
- type=str)
- parser.add_argument('--timeout', help='e.g. --timeout=60', type=int, default=60)
- parser.add_argument('--force-kill', help='e.g. --force-kill', action='store_true')
- parser.add_argument('-v', '--verbose', help='e.g. -v or -verbose', action='store_true')
- # Parse Arguments
- args = parser.parse_args()
- print(args)
- if args.verbose:
- def info(*args):
- for arg in args:
- print (arg),
- print
- else:
- info = lambda *a: None
- class Handler(object):
- lock = threading.Lock()
- def __init__(self, browser, tab):
- self.browser = browser
- self.tab = tab
- self.start_frame = None
- self.is_first_request = True
- self.request_id = None
- self.is_first_response = True
- self.url = None
- if args.user_agent is not None:
- self.tab.Network.setUserAgentOverride(userAgent=args.user_agent)
- def frame_started_loading(self, frameId):
- if not self.start_frame:
- self.start_frame = frameId
- def frame_stopped_loading(self, frameId):
- if self.start_frame == frameId:
- self.tab.Page.stopLoading()
- with self.lock:
- try:
- # Activate Tab
- self.browser.activate_tab(self.tab.id)
- info('Activated Tab: %s' % self.url)
- # Document
- document = self.tab.DOM.getDocument()
- # Full DOM
- dom = self.tab.DOM.getOuterHTML(nodeId=document['root']['nodeId'])
- # Use Beautiful Soup to Prettify
- info('Prettify HTML and write to file.')
- soup = BeautifulSoup(dom['outerHTML'], 'html.parser')
- prettyHTML = soup.prettify()
- outerHtmlFile = '%s/outer.html' % self.url
- if not os.path.exists(os.path.dirname(outerHtmlFile)):
- try:
- os.makedirs(os.path.dirname(outerHtmlFile))
- except OSError as exc:
- if exc.errno != errno.EEXIST:
- raise
- with open(outerHtmlFile, 'wb') as stream:
- stream.write(prettyHTML.encode('utf-8'))
- # Full Page Screenshot
- info('Take full page screenshot and write binary to file.')
- self.tab.Emulation.setDeviceMetricsOverride(width=args.window_size[0], height=args.window_size[1],
- deviceScaleFactor=0.0, mobile=False, fitWindow=False)
- body = self.tab.DOM.querySelector(nodeId=document['root']['nodeId'], selector='body')
- box = self.tab.DOM.getBoxModel(nodeId=body['nodeId'])
- self.tab.Emulation.setVisibleSize(width=args.window_size[0], height=box['model']['height'])
- self.tab.Emulation.forceViewport(x=0, y=0, scale=1)
- screenshot = self.tab.Page.captureScreenshot()
- screenshotFile = '%s/screenshot.png' % self.url
- if not os.path.exists(os.path.dirname(screenshotFile)):
- try:
- os.makedirs(os.path.dirname(screenshotFile))
- except OSError as exc:
- if exc.errno != errno.EEXIST:
- raise
- with open(screenshotFile, 'wb') as stream:
- stream.write(base64.b64decode(screenshot['data']))
- finally:
- info('Stop Tab: %s.' % self.url)
- self.tab.stop()
- def request_will_be_sent(self, **kwargs):
- if self.is_first_request:
- self.is_first_request = False
- # Set the URL we're making the request to.
- self.url = urlparse(kwargs.get('request').get('url')).hostname.replace('www.', '')
- info('Loading: %s' % self.url)
- def response_received(self, **kwargs):
- if self.is_first_response:
- self.is_first_response = False
- info('Response Received: %s' % self.url)
- info('Prettify JSON headers and write to file.')
- headersFile = '%s/headers.json' % self.url
- if not os.path.exists(os.path.dirname(headersFile)):
- try:
- os.makedirs(os.path.dirname(headersFile))
- except OSError as exc:
- if exc.errno != errno.EEXIST:
- raise
- with open(headersFile, 'w') as stream:
- stream.write(json.dumps(kwargs.get('response').get('headers'), indent=2))
- # Close all tabs utility
- def close_all_tabs(browser):
- if len(browser.list_tab()) == 0:
- return
- for tab in browser.list_tab():
- try:
- tab.stop()
- except pychrome.RuntimeException:
- pass
- browser.close_tab(tab)
- time.sleep(1)
- assert len(browser.list_tab()) == 0
- def main():
- chromeArguments = ['/usr/bin/google-chrome', '--headless', '--hide-scrollbars', '--disable-gpu',
- '--remote-debugging-port=9222']
- # Find Chrome utility function.
- def find_chrome():
- for process in psutil.process_iter():
- if process.name() == 'chrome' and chromeArguments == process.cmdline():
- return process
- return False
- # Chrome
- if find_chrome():
- info('A Google Chrome process already exists with the arguments we need... we\'ll use that.')
- else:
- info('Starting Chrome.')
- devnull = open(os.devnull, 'wb')
- Popen(chromeArguments,
- shell=False,
- stdout=PIPE,
- stderr=devnull)
- # We have to block for 1s to prevent a race condition.
- time.sleep(1)
- info('Chrome is running... Let\'s interact with in through the chrome dev tools protocol.')
- browser = pychrome.Browser()
- close_all_tabs(browser)
- tabs = []
- for i in range(len(args.urls)):
- tabs.append(browser.new_tab())
- for i, tab in enumerate(tabs):
- eh = Handler(browser, tab)
- tab.Page.frameStartedLoading = eh.frame_started_loading
- tab.Page.frameStoppedLoading = eh.frame_stopped_loading
- tab.Network.requestWillBeSent = eh.request_will_be_sent
- tab.Network.responseReceived = eh.response_received
- tab.Network.enable()
- tab.Page.stopLoading()
- tab.Page.enable()
- tab.Page.navigate(url=args.urls[i])
- for i, tab in enumerate(tabs):
- success = tab.wait(args.timeout)
- if not success:
- info('Timeout.')
- errorFile = '%s/error.log' % urlparse(args.urls[i]).hostname.replace('www.', '')
- if not os.path.exists(os.path.dirname(errorFile)):
- try:
- os.makedirs(os.path.dirname(errorFile))
- except OSError as exc:
- if exc.errno != errno.EEXIST:
- raise
- with open(errorFile, 'w') as stream:
- stream.write('Timed out.')
- info('Close Tab.')
- browser.close_tab(tab)
- if args.force_kill:
- info('Force Kill Chrome.')
- find_chrome().kill()
- info('Complete.')
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement