Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import random
- import string
- import asyncio
- import functools
- import os
- import uvloop
- import aiodns
- import click
- import socket
- import sys
- from tqdm import tqdm
- from aiodnsbrute.logger import ConsoleLogger
- class aioDNSBrute(object):
- """aiodnsbrute implements fast domain name brute forcing using Python's asyncio module."""
- def __init__(self, verbosity=0, max_tasks=512):
- """Constructor.
- Args:
- verbosity: set output verbosity: 0 (default) is none, 3 is debug
- max_tasks: the maximum number of tasks asyncio will queue (default 512)
- """
- self.tasks = []
- self.errors = []
- self.fqdn = []
- self.ignore_hosts = []
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- self.loop = asyncio.get_event_loop()
- self.resolver = aiodns.DNSResolver(loop=self.loop, rotate=True)
- self.sem = asyncio.BoundedSemaphore(max_tasks)
- self.max_tasks = max_tasks
- self.verbosity = verbosity
- self.logger = ConsoleLogger(verbosity)
- async def _dns_lookup(self, name):
- """Performs a DNS request using aiodns, self.lookup_type is set by the run function.
- A query for A record returns <ares_query_a_result> which does not return metadata about
- when a CNAME was resolved (just host and ttl attributes) however it should be faster.
- The <ares_host_result> returned by gethostbyname contains name, aliases, and addresses, if
- name is different in response we can surmise that the original domain was a CNAME entry.
- Args:
- name: the domain name to resolve
- Returns:
- object: <ares_query_a_result> if query, <ares_host_result> if gethostbyname
- """
- if self.lookup_type == "query":
- return await self.resolver.query(name, "A")
- elif self.lookup_type == "gethostbyname":
- return await self.resolver.gethostbyname(name, socket.AF_INET)
- def _dns_result_callback(self, name, future):
- """Handles the pycares object passed by the _dns_lookup function. We expect an errror to
- be present in the returned object because most lookups will be for names that don't exist.
- c-ares errors are passed through directly, error types can be identified in ares_strerror.c
- Args:
- name: original lookup name (because the query_result object doesn't contain it)
- future: the completed future (pycares dns result)
- """
- # Record processed we can now release the lock
- self.sem.release()
- # Handle known exceptions, barf on other ones
- if future.exception() is not None:
- try:
- err_number = future.exception().args[0]
- err_text = future.exception().args[1]
- except IndexError:
- self.logger.error(f"Couldn't parse exception: {future.exception()}")
- # handle the DNS errors we expect to receive, show user unexpected errors
- if err_number == 4:
- # This is domain name not found, ignore it
- pass
- elif err_number == 12:
- # Timeout from DNS server
- self.logger.warn(f"Timeout for {name}")
- elif err_number == 1:
- # Server answered with no data
- pass
- else:
- self.logger.error(
- f"{name} generated an unexpected exception: {future.exception()}"
- )
- # for debugging/troubleshoooting keep a list of errors
- # self.errors.append({'hostname': name, 'error': err_text})
- # parse and output and store results.
- else:
- if self.lookup_type == "query":
- ips = [ip.host for ip in future.result()]
- cname = False
- row = f"{name:<30}\t{ips}"
- elif self.lookup_type == "gethostbyname":
- r = future.result()
- ips = [ip for ip in r.addresses]
- if name == r.name:
- cname = False
- n = f"""{name:<30}\t{f"{'':<35}" if self.verbosity >= 2 else ""}"""
- else:
- cname = True
- # format the name based on verbosity - this is kluge
- short_cname = f"{r.name[:28]}.." if len(r.name) > 30 else r.name
- n = f'{name}{"**" if self.verbosity <= 1 else ""}'
- n = f'''{n:<30}\t{f"CNAME {short_cname:<30}" if self.verbosity >= 2 else ""}'''
- row = f"{n:<30}\t{ips}"
- # store the result
- if set(ips) != set(self.ignore_hosts):
- self.logger.success(row)
- dns_lookup_result = {"domain": name, "ip": ips}
- if self.lookup_type == "gethostbyname" and cname:
- dns_lookup_result["cname"] = r.name
- dns_lookup_result["aliases"] = r.aliases
- self.fqdn.append(dns_lookup_result)
- self.logger.debug(future.result())
- self.tasks.remove(future)
- if self.verbosity >= 1:
- self.pbar.update()
- async def _queue_lookups(self, wordlist, domain):
- """Takes a list of words and adds them to the async loop also passing the original
- lookup domain name; then attaches the processing callback to deal with the result.
- Args:
- wordlist: a list of names to perform lookups for
- domain: the base domain to perform brute force against
- """
- for word in wordlist:
- # Wait on the semaphore before adding more tasks
- await self.sem.acquire()
- host = f"{word.strip()}.{domain}"
- task = asyncio.ensure_future(self._dns_lookup(host))
- task.add_done_callback(functools.partial(self._dns_result_callback, host))
- self.tasks.append(task)
- await asyncio.gather(*self.tasks, return_exceptions=True)
- def run(
- self, wordlist, domain, resolvers=None, wildcard=True, verify=True, query=True
- ):
- """
- Sets up the bruteforce job, does domain verification, sets resolvers, checks for wildcard
- response to lookups, and sets the query type to be used. After all this, open the wordlist
- file and start the brute force - with ^C handling to cleanup nicely.
- Args:
- wordlist: a string containing a path to a filename to be used as a wordlist
- domain: the base domain name to be used for lookups
- resolvers: a list of DNS resolvers to be used (default None, uses system resolvers)
- wildcard: bool, do wildcard dns detection (default true)
- verify: bool, check if domain exists (default true)
- query: bool, use query to do lookups (default true), false means gethostbyname is used.
- Returns:
- dict containing result of lookups
- """
- self.logger.info(
- f"Brute forcing {domain} with a maximum of {self.max_tasks} concurrent tasks..."
- )
- if verify:
- self.logger.info(f"Using local resolver to verify {domain} exists.")
- try:
- socket.gethostbyname(domain)
- except socket.gaierror as err:
- self.logger.error(
- f"Couldn't resolve {domain}, use the --no-verify switch to ignore this error."
- )
- raise SystemExit(
- self.logger.error(f"Error from host lookup: {err}")
- )
- else:
- self.logger.warn("Skipping domain verification. YOLO!")
- if resolvers:
- self.resolver.nameservers = resolvers
- self.logger.info(
- f"Using recursive DNS with the following servers: {self.resolver.nameservers}"
- )
- if wildcard:
- # 63 chars is the max allowed segment length, there is practically no chance that it will be a legit record
- random_sld = (
- lambda: f'{"".join(random.choice(string.ascii_lowercase + string.digits) for i in range(63))}'
- )
- try:
- self.lookup_type = "query"
- wc_check = self.loop.run_until_complete(
- self._dns_lookup(f"{random_sld()}.{domain}")
- )
- except aiodns.error.DNSError as err:
- # we expect that the record will not exist and error 4 will be thrown
- self.logger.info(
- f"No wildcard response was detected for this domain."
- )
- wc_check = None
- finally:
- if wc_check is not None:
- self.ignore_hosts = [host.host for host in wc_check]
- self.logger.warn(
- f"Wildcard response detected, ignoring answers containing {self.ignore_hosts}"
- )
- else:
- self.logger.warn("Wildcard detection is disabled")
- if query:
- self.logger.info(
- "Using pycares `query` function to perform lookups, CNAMEs cannot be identified"
- )
- self.lookup_type = "query"
- else:
- self.logger.info(
- "Using pycares `gethostbyname` function to perform lookups, CNAME data will be appended to results (** denotes CNAME, show actual name with -vv)"
- )
- self.lookup_type = "gethostbyname"
- with open(wordlist, encoding="utf-8", errors="ignore") as words:
- w = words.read().splitlines()
- self.logger.info(f"Wordlist loaded, proceeding with {len(w)} DNS requests")
- try:
- if self.verbosity >= 1:
- self.pbar = tqdm(
- total=len(w), unit="rec", maxinterval=0.1, mininterval=0
- )
- self.loop.run_until_complete(self._queue_lookups(w, domain))
- except KeyboardInterrupt:
- self.logger.warn("Caught keyboard interrupt, cleaning up...")
- asyncio.gather(*asyncio.Task.all_tasks()).cancel()
- self.loop.stop()
- finally:
- self.loop.close()
- if self.verbosity >= 1:
- self.pbar.close()
- self.logger.info(f"Completed, {len(self.fqdn)} subdomains found")
- return self.fqdn
- @click.command()
- @click.option(
- "--wordlist",
- "-w",
- help="Wordlist to use for brute force.",
- default=f"{os.path.dirname(os.path.realpath(__file__))}/wordlists/bitquark_20160227_subdomains_popular_1000",
- )
- @click.option(
- "--max-tasks",
- "-t",
- default=512,
- help="Maximum number of tasks to run asynchronosly.",
- )
- @click.option(
- "--resolver-file",
- "-r",
- type=click.File("r"),
- default=None,
- help="A text file containing a list of DNS resolvers to use, one per line, comments start with #. Default: use system resolvers",
- )
- @click.option(
- "--verbosity", "-v", count=True, default=1, help="Increase output verbosity"
- )
- @click.option(
- "--output",
- "-o",
- type=click.Choice(["csv", "json", "off"]),
- default="off",
- help="Output results to DOMAIN.csv/json (extension automatically appended when not using -f).",
- )
- @click.option(
- "--outfile",
- "-f",
- type=click.File("w"),
- help="Output filename. Use '-f -' to send file output to stdout overriding normal output.",
- )
- @click.option(
- "--query/--gethostbyname",
- default=True,
- help="DNS lookup type to use query (default) should be faster, but won't return CNAME information.",
- )
- @click.option(
- "--wildcard/--no-wildcard",
- default=True,
- help="Wildcard detection, enabled by default",
- )
- @click.option(
- "--verify/--no-verify",
- default=True,
- help="Verify domain name is sane before beginning, enabled by default",
- )
- @click.version_option("0.3.2")
- @click.argument("domain", required=True)
- def main(**kwargs):
- """aiodnsbrute is a command line tool for brute forcing domain names utilizing Python's asyncio module.
- credit: blark (@markbaseggio)
- """
- output = kwargs.get("output")
- verbosity = kwargs.get("verbosity")
- resolvers = kwargs.get("resolver_file")
- if output is not "off":
- outfile = kwargs.get("outfile")
- # turn off output if we want JSON/CSV to stdout, hacky
- if outfile.__class__.__name__ == "TextIOWrapper":
- verbosity = 0
- if outfile is None:
- # wasn't specified on command line
- outfile = open(f'{kwargs["domain"]}.{output}', "w")
- if resolvers:
- lines = resolvers.read().splitlines()
- resolvers = [x.strip() for x in lines if (x and not x.startswith("#"))]
- bf = aioDNSBrute(verbosity=verbosity, max_tasks=kwargs.get("max_tasks"))
- results = bf.run(
- wordlist=kwargs.get("wordlist"),
- domain=kwargs.get("domain"),
- resolvers=resolvers,
- wildcard=kwargs.get("wildcard"),
- verify=kwargs.get("verify"),
- query=kwargs.get("query"),
- )
- if output in ("json"):
- import json
- json.dump(results, outfile)
- if output in ("csv"):
- import csv
- writer = csv.writer(outfile)
- writer.writerow(["Hostname", "IPs", "CNAME", "Aliases"])
- [
- writer.writerow(
- [
- r.get("domain"),
- r.get("ip", [""])[0],
- r.get("cname"),
- r.get("aliases", [""])[0],
- ]
- )
- for r in results
- ]
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement