Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import asyncio
- import aiohttp
- from datetime import timedelta
- import itertools
- import string
- import time
- from tqdm import tqdm
- DEFAULTS = {
- "retries": 5,
- "delay": 1,
- "timeout": 15,
- "max_workers": 20,
- "proxy_type": "socks5"
- }
- HEADERS = {'Accept': 'application/vnd.github.v3+json'}
- async def check_url(session, progress, proxy_obj, repository_path, hex_string, retries=None, delay=None):
- progress.set_description(f"Current: {hex_string}")
- url = f"https://api.github.com/repos/{repository_path}/commits/{hex_string}"
- get_proxy = lambda: proxy_obj['type'] + '://' + proxy_obj['proxies'][proxy_obj['index']] if proxies else None
- attempt = 0
- while retries == -1 or attempt < retries:
- try:
- async with session.head(url, proxy=get_proxy()) as response:
- if response.status == 200:
- url = f"https://github.com/{repository_path}/commit/{hex_string}"
- async with session.get(url, proxy=get_proxy()) as response:
- text = await response.text()
- if "This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository." in text:
- return f"Found: {url}"
- elif response.headers.get('X-RateLimit-Remaining', '0') == '0':
- # Rotate proxy
- proxy_obj['index'] = (proxy_obj['index'] + 1) % len(proxy_obj['proxies']) if proxy_obj['proxies'] else 0
- # If we've gone through all proxies, wait until the rate limit resets
- if proxy_obj['index'] == 0:
- reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
- sleep_time = int(max(reset_time - time.time(), 0))
- while sleep_time > 0:
- progress.set_description(f"Proxy and rate limit reached, sleeping for {timedelta(seconds=sleep_time)}")
- await asyncio.sleep(1)
- sleep_time -= 1
- else:
- break
- except aiohttp.ClientError as e:
- if delay:
- await asyncio.sleep(delay)
- attempt += 1
- # except:
- # return
- if attempt == retries:
- return f"Failed to connect to {url} - skipping."
- async def worker(queue, semaphore, session, progress, proxy_type, proxies, repository_path, retries, delay, output_file):
- proxy_obj = {'proxies': proxies, 'index': 0, 'type': proxy_type}
- while True:
- async with semaphore:
- hex_string = await queue.get()
- if hex_string is None:
- break
- result = await check_url(session, progress, proxy_obj, repository_path, hex_string, retries, delay)
- if not isinstance(result, Exception):
- if result is not None:
- tqdm.write(result)
- # Write the result to the output file
- if output_file is not None:
- output_file.write(result + "\n")
- # Update the progress bar
- progress.update(1)
- queue.task_done()
- async def main(args, hex_combinations, total_combinations, cookies, proxies, output_file):
- # Create a session
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(args.timeout) if args.timeout else None, cookies=cookies, headers=HEADERS) as session:
- # Create a progress bar
- progress = tqdm(total=total_combinations, desc="Starting", bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}')
- # Create a semaphore
- semaphore = asyncio.Semaphore(args.max_workers)
- # Create a bounded queue with a maximum size equal to the number of workers
- queue = asyncio.Queue(maxsize=args.max_workers)
- # Create workers
- workers = [asyncio.create_task(worker(queue, semaphore, session, progress, args.proxy_type, proxies, args.repository_path, args.retries, args.delay, output_file)) for _ in range(args.max_workers)]
- # Enqueue tasks
- for hex_string in hex_combinations:
- await queue.put(''.join(hex_string))
- await queue.join()
- # Indicate that there are no more tasks to process
- for _ in range(args.max_workers):
- await queue.put(None)
- # Wait for all workers to finish processing tasks
- await asyncio.gather(*workers, return_exceptions=True)
- # Close the progress bar
- progress.close()
- # Close the output file
- if output_file is not None:
- output_file.close()
- def custom_combinations(chars, repeat):
- chars = sorted(chars, key=lambda c: (c.isdigit(), c))
- for r in range(repeat, 0, -1):
- for combination in itertools.combinations(chars, r):
- for fill in itertools.product(chars, repeat=repeat-r):
- yield from set(itertools.permutations(combination + fill, repeat))
- if __name__ == "__main__":
- # Create an argument parser
- parser = argparse.ArgumentParser(description="GitHub dangling commit bruteforcer. by softzer0 & ChatGPT")
- # Add arguments
- parser.add_argument("repository_path", type=str, help="Path to the repository, in format: owner/repository")
- parser.add_argument("--proxies_file", type=str, default=None, help="Proxies to use for requests. Should be in format IP:port.")
- parser.add_argument("--proxy_type", choices=['http', 'https', 'socks5', 'socks4'], default=DEFAULTS['proxy_type'], help=f"The type of the proxy. Default is {DEFAULTS['proxy_type']}.")
- parser.add_argument('--token', type=str, help="GitHub personal access token to use in API requests.")
- parser.add_argument("--cookies_file", type=str, default=None, help="File containing cookies to use for requests.")
- parser.add_argument("--log_file", type=str, default=None, help="File to write the results to. If not specified, they won't be written to a file.")
- parser.add_argument("--timeout", type=int, default=DEFAULTS['timeout'], help=f"Timeout in seconds for each request. Default is {DEFAULTS['timeout']}. Set to 0 for no timeout.")
- parser.add_argument("--retries", type=int, default=DEFAULTS['retries'], help=f"Number of retries for each URL. Default is {DEFAULTS['retries']}. Set to -1 to disable, 0 for unlimited.")
- parser.add_argument("--delay", type=float, default=DEFAULTS['delay'], help=f"Delay in seconds between each retry. Default is {DEFAULTS['delay']}. Set to 0 for no delay.")
- parser.add_argument("--max_workers", type=int, default=DEFAULTS['max_workers'], help=f"Number of worker instances. Default is {DEFAULTS['max_workers']}.")
- # Parse the arguments
- args = parser.parse_args()
- output_file = None
- cookies = aiohttp.CookieJar().load(args.cookies_file) if args.cookies_file else None
- if args.token:
- HEADERS['Authorization'] = f"token {args.token}"
- proxies = None
- if args.proxies_file:
- with open(args.proxies_file, "r") as f:
- proxies = f.readlines()
- # Set up logging
- if args.log_file:
- output_file = open(args.log_file, "w")
- # Generate all possible combinations of a 7 character long hex string
- hex_combinations = custom_combinations(string.hexdigits.lower(), 7)
- # Total number of combinations
- total_combinations = sum(len(string.hexdigits.lower()) ** r for r in range(1, 8))
- try:
- loop = asyncio.new_event_loop()
- asyncio.set_event_loop(loop)
- loop.run_until_complete(main(args, hex_combinations, total_combinations, cookies, proxies, output_file))
- except KeyboardInterrupt:
- tqdm.write("Stopping...")
- tasks = asyncio.all_tasks(loop)
- for task in tasks:
- task.cancel()
- loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
- finally:
- loop.close()
- tqdm.write("Stopped.")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement