Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- Ripley.cl stealth browser with Cloudflare bypass and session persistence.
- Uses Camoufox for anti-detection.
- """
- import asyncio
- import json
- import os
- import random
- import time
- from pathlib import Path
- from typing import Optional
- from camoufox.async_api import AsyncCamoufox
- async def save_session(page, context, session_file: str = "session_cookies.json"):
- """Save browser cookies and storage for session reuse."""
- try:
- # Get all cookies from the context
- cookies = await context.cookies()
- # Also try to get localStorage and sessionStorage
- storage_data = {}
- try:
- storage_data = await page.evaluate("""
- () => {
- const local = {};
- const session = {};
- for (let i = 0; i < localStorage.length; i++) {
- const key = localStorage.key(i);
- local[key] = localStorage.getItem(key);
- }
- for (let i = 0; i < sessionStorage.length; i++) {
- const key = sessionStorage.key(i);
- session[key] = sessionStorage.getItem(key);
- }
- return { localStorage: local, sessionStorage: session };
- }
- """)
- except:
- pass
- session_data = {
- 'cookies': cookies,
- 'storage': storage_data
- }
- with open(session_file, 'w') as f:
- json.dump(session_data, f, indent=2)
- total_items = len(cookies) + len(storage_data.get('localStorage', {})) + len(storage_data.get('sessionStorage', {}))
- print(f"[+] Session saved: {session_file} ({len(cookies)} cookies, {total_items} total items)")
- return True
- except Exception as e:
- print(f"[!] Failed to save session: {e}")
- return False
- async def load_session(page, context, session_file: str = "session_cookies.json"):
- """Load saved cookies and storage to restore session."""
- try:
- if not os.path.exists(session_file):
- print(f"[!] No session found: {session_file}")
- return False
- with open(session_file, 'r') as f:
- session_data = json.load(f)
- # Handle old format (just cookies array) or new format (dict with cookies and storage)
- if isinstance(session_data, list):
- cookies = session_data
- storage_data = {}
- else:
- cookies = session_data.get('cookies', [])
- storage_data = session_data.get('storage', {})
- if not cookies and not storage_data:
- print("[!] Empty session file")
- return False
- # Restore cookies
- if cookies:
- await context.add_cookies(cookies)
- # Restore localStorage and sessionStorage
- if storage_data:
- try:
- await page.evaluate("""
- (storageData) => {
- const local = storageData.localStorage || {};
- const session = storageData.sessionStorage || {};
- for (const [key, value] of Object.entries(local)) {
- localStorage.setItem(key, value);
- }
- for (const [key, value] of Object.entries(session)) {
- sessionStorage.setItem(key, value);
- }
- }
- """, storage_data)
- except:
- pass
- total_items = len(cookies) + len(storage_data.get('localStorage', {})) + len(storage_data.get('sessionStorage', {}))
- print(f"[+] Session loaded: {session_file} ({len(cookies)} cookies, {total_items} total items)")
- return True
- except Exception as e:
- print(f"[!] Failed to load session: {e}")
- return False
- def is_session_valid(
- session_file: str = "session_cookies.json",
- max_age_hours: int = 24,
- max_uses: int = None
- ):
- """Check if session file exists, is recent enough, and hasn't been overused."""
- try:
- if not os.path.exists(session_file):
- return False
- file_age_hours = (time.time() - os.path.getmtime(session_file)) / 3600
- if file_age_hours > max_age_hours:
- print(f"[!] Session expired ({file_age_hours:.1f}h old)")
- return False
- # Check usage count if max_uses is set
- if max_uses:
- stats_file = session_file.replace('.json', '_stats.json')
- if os.path.exists(stats_file):
- try:
- with open(stats_file, 'r') as f:
- stats = json.load(f)
- use_count = stats.get('use_count', 0)
- if use_count >= max_uses:
- print(f"[!] Session overused ({use_count}/{max_uses} requests)")
- return False
- except:
- pass
- print(f"[+] Valid session found ({file_age_hours:.1f}h old)")
- return True
- except Exception as e:
- print(f"[!] Error checking session: {e}")
- return False
- def increment_session_usage(session_file: str = "session_cookies.json"):
- """Track session usage count."""
- stats_file = session_file.replace('.json', '_stats.json')
- stats = {'use_count': 0, 'created_at': time.time(), 'last_used': time.time()}
- if os.path.exists(stats_file):
- try:
- with open(stats_file, 'r') as f:
- stats = json.load(f)
- except:
- pass
- stats['use_count'] = stats.get('use_count', 0) + 1
- stats['last_used'] = time.time()
- try:
- with open(stats_file, 'w') as f:
- json.dump(stats, f, indent=2)
- return stats['use_count']
- except:
- return None
- def rotate_session(session_file: str = "session_cookies.json"):
- """Delete old session to force creation of new one."""
- try:
- if os.path.exists(session_file):
- os.remove(session_file)
- print(f"[+] Session rotated: {session_file}")
- stats_file = session_file.replace('.json', '_stats.json')
- if os.path.exists(stats_file):
- os.remove(stats_file)
- return True
- except Exception as e:
- print(f"[!] Failed to rotate session: {e}")
- return False
- async def wait_for_cloudflare(page, max_wait: int = 40, skip_check: bool = False):
- """Wait for Cloudflare challenge completion and SPA load."""
- if skip_check:
- print("[+] Using saved session")
- return True
- print("Checking for Cloudflare...")
- start_time = time.time()
- await asyncio.sleep(2)
- last_content_length = 0
- stable_count = 0
- while time.time() - start_time < max_wait:
- try:
- try:
- title = await page.title()
- except:
- title = ""
- try:
- content = await page.content()
- except:
- print(f" Page loading... ({int(time.time() - start_time)}s)")
- await asyncio.sleep(2)
- continue
- content_length = len(content)
- cloudflare_indicators = [
- "Just a moment",
- "Checking your browser",
- "Please wait",
- "Verifying you are human",
- "cf-browser-verification",
- "challenge-running",
- "challenge-platform",
- ]
- has_challenge = any(indicator.lower() in content.lower() or
- indicator.lower() in title.lower()
- for indicator in cloudflare_indicators)
- has_real_content = any([
- "ripley" in content.lower() and content_length > 50000,
- "producto" in content.lower() and content_length > 50000,
- "gtag" in content.lower() and content_length > 50000,
- "dataLayer" in content.lower() and content_length > 50000,
- ])
- if abs(content_length - last_content_length) < 1000:
- stable_count += 1
- else:
- stable_count = 0
- last_content_length = content_length
- if not has_challenge and has_real_content:
- print(f"[+] Cloudflare bypassed ({content_length:,} chars)")
- return True
- if stable_count >= 3 and content_length > 100000:
- print(f"[+] Content loaded ({content_length:,} chars)")
- return True
- elapsed = int(time.time() - start_time)
- if elapsed > 0 and elapsed % 5 == 0:
- status = "challenge" if has_challenge else "loading"
- print(f" [{status}] {content_length:,} chars, {elapsed}s")
- await asyncio.sleep(1)
- except Exception as e:
- print(f" Error: {str(e)[:50]}")
- await asyncio.sleep(2)
- print(f"[!] Timeout after {max_wait}s")
- return False
- async def human_like_behavior(page):
- """Simulate human interaction patterns."""
- print("Simulating human behavior...")
- await asyncio.sleep(random.uniform(0.3, 0.8))
- for _ in range(random.randint(1, 2)):
- x = random.randint(100, 800)
- y = random.randint(100, 600)
- await page.mouse.move(x, y)
- await asyncio.sleep(random.uniform(0.05, 0.15))
- viewport_height = await page.evaluate("window.innerHeight")
- scroll_positions = [
- random.randint(300, 800),
- random.randint(800, 1500),
- ]
- for scroll_y in scroll_positions:
- await page.evaluate(f"window.scrollTo({{ top: {scroll_y}, behavior: 'smooth' }})")
- await asyncio.sleep(random.uniform(0.3, 0.7))
- await page.evaluate("window.scrollTo({ top: 0, behavior: 'smooth' })")
- await asyncio.sleep(random.uniform(0.2, 0.5))
- print("[+] Behavior simulation complete")
- async def browse_and_extract_html(
- url: str,
- output_file: str = "output.html",
- headless: bool = False,
- max_retries: int = 3,
- use_session: bool = True,
- session_file: str = "ripley_session.json",
- session_max_age_hours: int = 24,
- session_max_uses: int = None
- ) -> Optional[str]:
- """
- Browse URL with Camoufox stealth browser and extract HTML.
- Args:
- url: Target URL
- output_file: HTML output path
- headless: Run without GUI
- max_retries: Retry attempts
- use_session: Enable session persistence
- session_file: Session storage path
- session_max_age_hours: Max session age before rotation (default: 24h)
- session_max_uses: Max requests per session before rotation (default: None/unlimited)
- """
- has_saved_session = use_session and is_session_valid(
- session_file,
- max_age_hours=session_max_age_hours,
- max_uses=session_max_uses
- )
- # Auto-rotate if session is invalid
- if use_session and not has_saved_session and os.path.exists(session_file):
- rotate_session(session_file)
- for attempt in range(max_retries):
- try:
- print(f"\n{'='*70}")
- print(f"Attempt {attempt + 1}/{max_retries}")
- print(f"{'='*70}")
- print(f"Starting Camoufox...")
- async with AsyncCamoufox(
- headless=headless,
- humanize=True,
- geoip=True,
- exclude_addons=[],
- os="windows",
- ) as browser:
- print(f"[+] Browser launched")
- context = browser.contexts[0] if browser.contexts else await browser.new_context()
- print(f"Navigating to: {url}")
- page = await browser.new_page()
- session_loaded = False
- if use_session and has_saved_session:
- session_loaded = await load_session(page, context, session_file)
- await page.set_extra_http_headers({
- 'Accept-Language': 'es-CL,es;q=0.9,en;q=0.8',
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
- 'Accept-Encoding': 'gzip, deflate, br',
- 'DNT': '1',
- 'Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- })
- try:
- print("Loading page...")
- response = await page.goto(
- url,
- wait_until="commit",
- timeout=60000
- )
- if response:
- status = response.status
- print(f"[+] Response: {status}")
- if status == 403:
- print(" [!] Cloudflare challenge detected")
- except Exception as nav_error:
- print(f"[!] Navigation warning: {nav_error}")
- await asyncio.sleep(2)
- challenge_passed = await wait_for_cloudflare(
- page,
- max_wait=40,
- skip_check=session_loaded
- )
- if challenge_passed:
- print("Waiting for dynamic content...")
- await asyncio.sleep(1)
- else:
- print("Waiting for content...")
- await asyncio.sleep(2)
- await human_like_behavior(page)
- print("Extracting HTML...")
- html_content = await page.content()
- if len(html_content) < 500:
- raise Exception("Content too short - blocked")
- print(f"[+] Extracted {len(html_content):,} chars")
- if "cloudflare" in html_content.lower() and len(html_content) < 5000:
- print("[!] Warning: Possible Cloudflare page")
- else:
- print("[+] Content OK")
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write(html_content)
- print(f"[+] Saved: {output_file}")
- screenshot_file = output_file.replace('.html', '.png')
- try:
- await page.screenshot(path=screenshot_file, full_page=False)
- print(f"[+] Screenshot: {screenshot_file}")
- except:
- print("[!] Screenshot failed")
- if use_session:
- await save_session(page, context, session_file)
- # Track session usage
- use_count = increment_session_usage(session_file)
- if use_count and session_max_uses:
- print(f"[+] Session usage: {use_count}/{session_max_uses}")
- elif use_count:
- print(f"[+] Session usage: {use_count}")
- await page.close()
- print(f"\n{'='*70}")
- print("[SUCCESS]")
- print(f"{'='*70}\n")
- return html_content
- except Exception as e:
- print(f"\n[ERROR] Attempt {attempt + 1}: {e}")
- if attempt < max_retries - 1:
- wait_time = random.uniform(3, 6) * (attempt + 1)
- print(f"Retrying in {wait_time:.1f}s...")
- await asyncio.sleep(wait_time)
- else:
- print(f"\n[FAILED] All attempts exhausted")
- raise
- return None
- async def main():
- """Main entry point."""
- print("\n" + "="*70)
- print("RIPLEY.CL STEALTH BROWSER")
- print("="*70 + "\n")
- url = "https://simple.ripley.cl/search/adagio%20teas"
- output_file = "ripley_search.html"
- session_file = "ripley_session.json"
- headless = True
- use_session = True
- # Session rotation config (adjust based on use case)
- session_max_age_hours = 24 # Rotate after 24 hours
- session_max_uses = 50 # Rotate after 50 requests (set to None for unlimited)
- print(f"URL: {url}")
- print(f"Output: {output_file}")
- print(f"Session: {session_file}")
- print(f"Headless: {headless}")
- print(f"Session rotation: {session_max_age_hours}h or {session_max_uses or 'unlimited'} uses")
- print(f"Features: Human behavior, GeoIP, Anti-detection")
- try:
- html_content = await browse_and_extract_html(
- url,
- output_file,
- headless=headless,
- max_retries=3,
- use_session=use_session,
- session_file=session_file,
- session_max_age_hours=session_max_age_hours,
- session_max_uses=session_max_uses
- )
- if html_content:
- print("\n" + "="*70)
- print("SUMMARY")
- print("="*70)
- print(f"Chars: {len(html_content):,}")
- print(f"Lines: {len(html_content.splitlines()):,}")
- print("\n" + "="*70)
- print("PREVIEW (first 800 chars)")
- print("="*70)
- print(html_content[:800])
- if len(html_content) > 800:
- print("...")
- print("\n" + "="*70)
- print("CONTENT CHECK")
- print("="*70)
- indicators = {
- "Ripley branding": "ripley" in html_content.lower(),
- "Product data": "product" in html_content.lower(),
- "Search results": "search" in html_content.lower() or "resultado" in html_content.lower(),
- "Price info": "$" in html_content or "precio" in html_content.lower(),
- "JS framework": any(x in html_content for x in ["__NEXT_DATA__", "react", "angular", "vue"]),
- }
- for indicator, present in indicators.items():
- status = "[+]" if present else "[-]"
- print(f"{status} {indicator}")
- print("\n" + "="*70)
- print("[SUCCESS]")
- print("="*70 + "\n")
- else:
- print("\n[FAILED]")
- except Exception as e:
- print(f"\n{'='*70}")
- print(f"[FATAL] {e}")
- print(f"{'='*70}\n")
- raise
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment