Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import logging
- import queue
- import re
- import threading
- from urllib.request import HTTPRedirectHandler, Request, build_opener
- MAX_THREADS = 100
- # TODO: добавить больше правил
- SQL_ERROR_RE = re.compile((
- # mysql/mysqli
- r'mysqli?_fetch\w+\(\)|'
- # pdo
- r'You have an error in your SQL syntax|'
- # PostgreSQL
- r'Query failed|'
- # Bitrix
- r'DB query error'))
- logger = logging.getLogger('SQLiScanner')
- class RedirectHandler(HTTPRedirectHandler):
- def redirect_request(self, req, fp, code, msg, headers, newurl):
- pass
- class Scanner(threading.Thread):
- def __init__(self, urls, results, lock, stop):
- super().__init__()
- self.urls = urls
- self.results = results
- self.lock = lock
- self.stop = stop
- self.daemon = True
- self.start()
- def run(self):
- while not self.stop.is_set():
- try:
- url = self.urls.get_nowait()
- except queue.Empty:
- continue
- try:
- # '" . mysqli_real_escape_string($link, trim($_GET['id'])) . "'
- url = url.replace('*', "'--%20[")
- req = Request(url, headers={'User-Agent': self.user_agent()})
- opener = build_opener(RedirectHandler())
- with opener.open(req, timeout=5) as resp:
- content = resp.read()
- enc = resp.info().get_content_charset('utf-8')
- content = content.decode(enc, 'replace')
- match = SQL_ERROR_RE.search(content)
- if match:
- with self.lock:
- logger.info('Found "%s" at url "%s".', match.group(0), url)
- self.results.append(url)
- except Exception as e:
- with self.lock:
- logger.error(e)
- finally:
- self.urls.task_done()
- def user_agent(self):
- # TODO: сделать рандомный юзер-агент
- return 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.135 Safari/537.36 Edge/12.246'
- def enqueue(o):
- q = queue.Queue()
- for x in o:
- q.put(x)
- return q
- def scan(urls, max_threads=MAX_THREADS):
- urls = enqueue(urls)
- num_threads = min(max_threads, urls.qsize())
- results = []
- lock = threading.Lock()
- stop = threading.Event()
- logger.info('Start scanning.')
- threads = [Scanner(urls, results, lock, stop) for _ in range(num_threads)]
- urls.join()
- stop.set()
- for t in threads:
- t.join()
- logger.info('Scanning finished.')
- return results
- if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- parser = argparse.ArgumentParser(description='SQLiScanner by tz4678@gmail.com.')
- parser.add_argument('-f', '--file', default='urls.txt', help='file with urls', type=str)
- parser.add_argument('-o', '--out', default='results.txt', help='file with results', type=str)
- parser.add_argument('-m', '--max_threads', default=MAX_THREADS, help='max number of threads', type=int)
- args = parser.parse_args()
- with open(args.file, 'r', encoding='utf-8') as fp:
- urls = [line.strip() for line in fp]
- results = scan(urls, args.max_threads)
- with open(args.out, 'w', encoding='utf-8') as fp:
- fp.writelines(results)
- input('Press any key to exit.')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement