Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import browser_cookie3
- import os
- import re
- import sys
- import json
- import urllib.parse
- from bs4 import BeautifulSoup
- from datetime import datetime
- import pytz
- import dateparser
- def sanitize_for_windows(filename):
- """Remove or replace invalid characters for Windows filenames, including control characters."""
- sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', filename)
- sanitized = sanitized.rstrip('. ')
- return sanitized[:250]
- def extract_single_post(post_url, session):
- """Extracts data for a single community post using the provided URL."""
- try:
- response = session.get(post_url)
- except requests.RequestException as e:
- print(f"Error fetching the post URL: {e}")
- return None
- if response.status_code != 200:
- print(f"Failed to fetch the post. Status code: {response.status_code}")
- return None
- soup = BeautifulSoup(response.text, 'html.parser')
- scripts = soup.find_all('script')
- yt_initial_data = None
- for script in scripts:
- if script.string and 'var ytInitialData' in script.string:
- yt_initial_data = script.string
- break
- if not yt_initial_data:
- print("Failed to find initial data in the page.")
- return None
- json_data = extract_json_from_script(yt_initial_data)
- if not json_data:
- print("Failed to parse the initial data JSON.")
- return None
- return parse_single_post_from_json(json_data)
- def extract_json_from_script(script_text):
- """Extracts JSON data from a JavaScript variable assignment."""
- match = re.search(r'var ytInitialData = ({.*?});', script_text, re.DOTALL)
- if match:
- json_text = match.group(1)
- try:
- return json.loads(json_text)
- except json.JSONDecodeError:
- print("JSON decode error.")
- return None
- def parse_single_post_from_json(json_data):
- """Parses a single community post from JSON data."""
- try:
- contents = json_data['contents']['twoColumnBrowseResultsRenderer']['tabs']
- for tab in contents:
- if 'tabRenderer' in tab and tab['tabRenderer'].get('selected', False):
- community_tab = tab['tabRenderer']
- community_content = community_tab['content']['sectionListRenderer']['contents']
- for item in community_content:
- post = extract_post(item)
- if post:
- return post
- except KeyError as e:
- print(f"KeyError while parsing post: {e}")
- return None
- def extract_post(item):
- """Extracts an individual post from the JSON item."""
- try:
- item_contents = item.get('itemSectionRenderer', {}).get('contents', [])
- for content in item_contents:
- if 'backstagePostThreadRenderer' in content:
- return content['backstagePostThreadRenderer']['post']
- except KeyError as e:
- print(f"KeyError while extracting post: {e}")
- return None
- def get_next_folder_number():
- """Automatically calculates the next folder number based on existing folders."""
- existing_folders = [f for f in os.listdir() if os.path.isdir(f) and re.match(r'^\d{3}', f)]
- if not existing_folders:
- return "001"
- folder_numbers = []
- for folder in existing_folders:
- match = re.match(r'^(\d{3})', folder)
- if match:
- folder_numbers.append(int(match.group(1)))
- if not folder_numbers:
- return "001"
- next_number = max(folder_numbers) + 1
- return f"{next_number:03d}"
- def get_original_image_url(img_url):
- """Removes the resolution parameters from an image URL to get the original size."""
- url_parts = urllib.parse.urlsplit(img_url)
- cleaned_path = re.sub(r"=[^&]+$", "", url_parts.path) # Remove the resolution part
- return urllib.parse.urlunsplit((url_parts.scheme, url_parts.netloc, cleaned_path, '', ''))
- def download_image(img_url, filename, session):
- """Downloads an image from a URL and saves it to a file."""
- try:
- response = session.get(img_url, stream=True)
- if response.status_code == 200:
- with open(filename, 'wb') as f:
- for chunk in response.iter_content(1024):
- f.write(chunk)
- print(f"Downloaded image: {filename}")
- else:
- print(f"Failed to download image {img_url}. Status code: {response.status_code}")
- except Exception as e:
- print(f"Error downloading image {img_url}: {e}")
- def process_post(post_data, session, browser_name, post_url):
- """Processes a single post and saves the required data."""
- eastern = pytz.timezone('US/Eastern')
- post_renderer = post_data.get('backstagePostRenderer', {})
- if not post_renderer:
- print("Post renderer data not found. Skipping post.")
- return
- post_id = post_renderer.get('postId', '')
- if not post_id:
- print("Post ID not found. Skipping post.")
- return
- # Extract published time
- published_time_text = post_renderer.get('publishedTimeText', {}).get('runs', [{}])[0].get('text', '')
- timestamp_str = None
- if published_time_text:
- published_time_text = re.sub(r'\(edited\)', '', published_time_text).strip()
- timestamp_dt = dateparser.parse(published_time_text, settings={'TIMEZONE': 'UTC', 'RETURN_AS_TIMEZONE_AWARE': True})
- if timestamp_dt:
- timestamp_dt = timestamp_dt.astimezone(eastern)
- if "year" in published_time_text.lower():
- timestamp_str = timestamp_dt.strftime('%Y')
- else:
- timestamp_str = timestamp_dt.strftime('%Y-%m')
- else:
- print(f"Failed to parse 'publishedTimeText' for post {post_id}. Using current time as fallback.")
- timestamp_dt = datetime.now(pytz.utc).astimezone(eastern)
- timestamp_str = timestamp_dt.strftime('%Y-%m')
- else:
- print(f"No published time found for post {post_id}. Using current time as fallback.")
- timestamp_dt = datetime.now(pytz.utc).astimezone(eastern)
- timestamp_str = timestamp_dt.strftime('%Y-%m')
- # Extract content text
- content_text = ''
- content_runs = post_renderer.get('contentText', {}).get('runs', [])
- for run in content_runs:
- content_text += run.get('text', '')
- # Extract likes count
- like_count = post_renderer.get('voteCount', {}).get('simpleText', '0')
- # Extract visibility if available
- visibility = post_renderer.get('visibility', {}).get('simpleText', 'Members')
- # Truncate content for folder name
- truncated_content = sanitize_for_windows(content_text[:50])
- # Get the next available folder number
- folder_number = get_next_folder_number()
- # Construct folder name using the folder number and date
- folder_name = f"{folder_number} {timestamp_str} - {truncated_content}"
- if not os.path.exists(folder_name):
- os.makedirs(folder_name)
- # Save post.txt with content and metadata
- post_txt_path = os.path.join(folder_name, 'post.txt')
- try:
- with open(post_txt_path, 'w', encoding='utf-8') as f:
- f.write(f"Date: {timestamp_str}\n")
- f.write(f"URL: {post_url}\n")
- f.write(f"Visibility: {visibility}\n")
- f.write(f"Likes: {like_count}\n")
- f.write("Content:\n")
- f.write(content_text)
- print(f"Post saved in: {post_txt_path}")
- except Exception as e:
- print(f"Error writing post.txt: {e}")
- # Extract and download images
- images = []
- seen_images = set()
- backstage_attachment = post_renderer.get('backstageAttachment', {})
- if 'postMultiImageRenderer' in backstage_attachment:
- # Handle posts with multiple images
- multi_images = backstage_attachment['postMultiImageRenderer'].get('images', [])
- for img_data in multi_images:
- image_renderer = img_data.get('backstageImageRenderer', {})
- thumbnails = image_renderer.get('image', {}).get('thumbnails', [])
- for img in thumbnails:
- img_url = get_original_image_url(img.get('url', ''))
- if img_url and img_url not in seen_images:
- seen_images.add(img_url)
- images.append(img_url)
- elif 'backstageImageRenderer' in backstage_attachment:
- # Single image
- image_renderer = backstage_attachment['backstageImageRenderer']
- thumbnails = image_renderer.get('image', {}).get('thumbnails', [])
- for img in thumbnails:
- img_url = get_original_image_url(img.get('url', ''))
- if img_url and img_url not in seen_images:
- seen_images.add(img_url)
- images.append(img_url)
- # Download images
- for idx, img_url in enumerate(images, start=1):
- img_extension = os.path.splitext(urllib.parse.urlsplit(img_url).path)[1] or '.webp'
- img_filename = os.path.join(folder_name, f'image_{idx}{img_extension}')
- download_image(img_url, img_filename, session)
- def main():
- if len(sys.argv) < 2:
- print("Usage: python script_name.py <browser_name>")
- print("Example: python script_name.py firefox")
- return
- browser_name = sys.argv[1].lower()
- try:
- if browser_name == 'firefox':
- cj = browser_cookie3.firefox()
- elif browser_name == 'chrome':
- cj = browser_cookie3.chrome()
- else:
- print(f"Unsupported browser: {browser_name}")
- return
- except Exception as e:
- print(f"Error extracting cookies from {browser_name}: {e}")
- return
- session = requests.Session()
- session.cookies = cj
- print("Enter a full YouTube community post URL or 'exit' to quit:")
- while True:
- post_url = input("Full URL: ").strip()
- if post_url.lower() == 'exit':
- print("Exiting...")
- break
- # Validate YouTube community post URL
- if not re.match(r'^https?://(www\.)?youtube\.com/channel/.+/community\?lb=\w+', post_url):
- print("Invalid URL. Please enter a valid YouTube community post URL.")
- continue
- print(f"Processing URL: {post_url}")
- post_data = extract_single_post(post_url, session)
- if not post_data:
- print("No post data found or an error occurred.")
- continue
- process_post(post_data, session, browser_name, post_url)
- print("Post archived successfully.")
- print("\nReady for the next URL. Enter 'exit' to quit.")
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment