Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # scrapy_chatgpt_spider.py
- """
- Scrapy Spider for ChatGPT conversations
- Optimized for Scrapy Cloud deployment
- """
- import scrapy
- from scrapy import signals
- from scrapy.crawler import CrawlerProcess
- import json
- from urllib.parse import urlparse
- import time
- class ChatGPTSpider(scrapy.Spider):
- name = 'chatgpt_scraper'
- custom_settings = {
- 'CONCURRENT_REQUESTS': 32,
- 'CONCURRENT_REQUESTS_PER_DOMAIN': 16,
- 'DOWNLOAD_DELAY': 0.5,
- 'RANDOMIZE_DOWNLOAD_DELAY': True,
- 'COOKIES_ENABLED': False,
- 'RETRY_TIMES': 3,
- 'RETRY_HTTP_CODES': [500, 502, 503, 504, 408, 429],
- # Scrapy-Playwright settings
- 'DOWNLOAD_HANDLERS': {
- 'https': 'scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler',
- },
- 'PLAYWRIGHT_BROWSER_TYPE': 'chromium',
- 'PLAYWRIGHT_LAUNCH_OPTIONS': {
- 'headless': True,
- 'args': [
- '--no-sandbox',
- '--disable-setuid-sandbox',
- '--disable-dev-shm-usage',
- '--disable-accelerated-2d-canvas',
- '--no-first-run',
- '--no-zygote',
- '--disable-gpu'
- ]
- },
- # AutoThrottle for adaptive delays
- 'AUTOTHROTTLE_ENABLED': True,
- 'AUTOTHROTTLE_START_DELAY': 0.5,
- 'AUTOTHROTTLE_MAX_DELAY': 10.0,
- 'AUTOTHROTTLE_TARGET_CONCURRENCY': 16.0,
- # Stats for monitoring
- 'STATS_CLASS': 'scrapy.statscollectors.MemoryStatsCollector',
- # Export settings
- 'FEEDS': {
- 's3://your-bucket/chatgpt-results/%(batch_id)s/%(name)s_%(time)s.jsonl': {
- 'format': 'jsonlines',
- 'store_empty': False,
- 'batch_item_count': 1000,
- }
- }
- }
- def __init__(self, urls_file=None, batch_id='default', *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.batch_id = batch_id
- # Load URLs
- if urls_file:
- with open(urls_file) as f:
- self.start_urls = [url.strip() for url in f if url.strip()]
- # Stats tracking
- self.processed_count = 0
- self.success_count = 0
- self.error_count = 0
- async def parse(self, response):
- """Parse ChatGPT conversation page"""
- # Check if using Playwright
- if hasattr(response, 'meta') and 'playwright' in response.meta:
- page = response.meta['playwright_page']
- # Wait for content
- await page.wait_for_selector('main', timeout=15000)
- await page.wait_for_timeout(2000)
- # Extract data
- conversation_data = await page.evaluate('''() => {
- const extractMessages = () => {
- const messages = [];
- const selectors = [
- '[data-message-author-role]',
- '.text-base',
- '[class*="markdown"]',
- '.group'
- ];
- let elements = [];
- for (const selector of selectors) {
- elements = document.querySelectorAll(selector);
- if (elements.length > 0) break;
- }
- elements.forEach((el, idx) => {
- const isUser = el.closest('[data-message-author-role="user"]') ||
- (idx % 2 === 0);
- messages.push({
- role: isUser ? 'user' : 'assistant',
- content: el.textContent.trim(),
- index: idx
- });
- });
- return messages;
- };
- return {
- title: document.title,
- messages: extractMessages(),
- conversation_id: window.location.pathname.split('/').pop()
- };
- }''')
- await page.close()
- # Yield result
- self.success_count += 1
- yield {
- 'url': response.url,
- 'title': conversation_data['title'],
- 'messages': conversation_data['messages'],
- 'conversation_id': conversation_data['conversation_id'],
- 'batch_id': self.batch_id,
- 'timestamp': time.time(),
- 'status': 'success'
- }
- else:
- # Fallback to regular parsing
- self.error_count += 1
- yield {
- 'url': response.url,
- 'status': 'error',
- 'error': 'Playwright not available',
- 'batch_id': self.batch_id,
- 'timestamp': time.time()
- }
- self.processed_count += 1
- # Log progress every 100 URLs
- if self.processed_count % 100 == 0:
- self.logger.info(
- f"Progress: {self.processed_count} processed, "
- f"{self.success_count} successful, {self.error_count} errors"
- )
- @classmethod
- def from_crawler(cls, crawler, *args, **kwargs):
- spider = super().from_crawler(crawler, *args, **kwargs)
- crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
- return spider
- def spider_closed(self, spider):
- """Generate final statistics"""
- stats = {
- 'spider': self.name,
- 'batch_id': self.batch_id,
- 'total_processed': self.processed_count,
- 'successful': self.success_count,
- 'errors': self.error_count,
- 'success_rate': (self.success_count / self.processed_count * 100) if self.processed_count > 0 else 0
- }
- self.logger.info(f"Spider closed. Stats: {json.dumps(stats, indent=2)}")
- # Distributed runner for local testing
- class DistributedScrapyRunner:
- def __init__(self, urls_file, num_processes=4):
- self.urls_file = urls_file
- self.num_processes = num_processes
- # Load and split URLs
- with open(urls_file) as f:
- all_urls = [url.strip() for url in f if url.strip()]
- # Split into chunks
- chunk_size = len(all_urls) // num_processes
- self.url_chunks = []
- for i in range(num_processes):
- start = i * chunk_size
- end = start + chunk_size if i < num_processes - 1 else len(all_urls)
- chunk_file = f'urls_chunk_{i}.txt'
- with open(chunk_file, 'w') as f:
- f.write('\n'.join(all_urls[start:end]))
- self.url_chunks.append(chunk_file)
- def run_spider(self, chunk_file, process_id):
- """Run spider for a chunk"""
- process = CrawlerProcess({
- 'USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
- 'LOG_LEVEL': 'INFO',
- 'FEEDS': {
- f'results_process_{process_id}.jsonl': {
- 'format': 'jsonlines',
- }
- }
- })
- process.crawl(ChatGPTSpider, urls_file=chunk_file, batch_id=f'process_{process_id}')
- process.start()
- def run_distributed(self):
- """Run spiders in parallel processes"""
- import multiprocessing
- processes = []
- for i, chunk_file in enumerate(self.url_chunks):
- p = multiprocessing.Process(
- target=self.run_spider,
- args=(chunk_file, i)
- )
- p.start()
- processes.append(p)
- print(f"Started process {i} for {chunk_file}")
- # Wait for all processes
- for p in processes:
- p.join()
- print("All processes completed")
- # Scrapy Cloud deployment configuration
- """
- scrapinghub.yml:
- ---
- projects:
- default: YOUR_PROJECT_ID
- stacks:
- default: scrapy:2.11-playwright
- requirements:
- file: requirements.txt
- """
- # requirements.txt for Scrapy Cloud
- """
- scrapy>=2.11.0
- scrapy-playwright>=0.0.26
- boto3>=1.26.0
- """
- # Deploy script
- def create_deploy_script():
- deploy_script = """#!/bin/bash
- # Deploy to Scrapy Cloud
- # Split URLs into batches
- split -l 2000 urls.txt urls_batch_
- # Deploy spider
- shub deploy
- # Schedule spider runs for each batch
- for batch in urls_batch_*; do
- echo "Scheduling spider for $batch"
- # Upload batch to Scrapy Cloud
- shub items-api upload $batch
- # Schedule spider
- shub schedule chatgpt_scraper \
- -a urls_file=$batch \
- -a batch_id=$(basename $batch) \
- -a CONCURRENT_REQUESTS=32 \
- -a DOWNLOAD_DELAY=0.5
- sleep 2
- done
- echo "All spiders scheduled!"
- # Monitor progress
- watch -n 30 'shub jobs | head -20'
- """
- with open('deploy_to_scrapy_cloud.sh', 'w') as f:
- f.write(deploy_script)
- print("Created deploy_to_scrapy_cloud.sh")
- if __name__ == "__main__":
- import sys
- if len(sys.argv) < 2:
- print("Usage:")
- print(" python scrapy_chatgpt_spider.py urls.txt # Run locally")
- print(" python scrapy_chatgpt_spider.py --deploy # Create deploy script")
- print(" python scrapy_chatgpt_spider.py --distributed 8 # Run with 8 processes")
- sys.exit(1)
- if sys.argv[1] == '--deploy':
- create_deploy_script()
- elif sys.argv[1] == '--distributed':
- num_processes = int(sys.argv[2]) if len(sys.argv) > 2 else 4
- runner = DistributedScrapyRunner('urls.txt', num_processes)
- runner.run_distributed()
- else:
- # Run single spider
- process = CrawlerProcess()
- process.crawl(ChatGPTSpider, urls_file=sys.argv[1])
- process.start()
Advertisement
Add Comment
Please, Sign In to add comment