Guest User

Youtube Post Archiver

a guest
Nov 30th, 2024
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 10.63 KB | None | 0 0
  1. import requests
  2. import browser_cookie3
  3. import os
  4. import re
  5. import sys
  6. import json
  7. import urllib.parse
  8. from bs4 import BeautifulSoup
  9. from datetime import datetime
  10. import pytz
  11. import dateparser
  12.  
  13. def sanitize_for_windows(filename):
  14.     """Remove or replace invalid characters for Windows filenames, including control characters."""
  15.     sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '_', filename)
  16.     sanitized = sanitized.rstrip('. ')
  17.     return sanitized[:250]
  18.  
  19. def extract_single_post(post_url, session):
  20.     """Extracts data for a single community post using the provided URL."""
  21.     try:
  22.         response = session.get(post_url)
  23.     except requests.RequestException as e:
  24.         print(f"Error fetching the post URL: {e}")
  25.         return None
  26.  
  27.     if response.status_code != 200:
  28.         print(f"Failed to fetch the post. Status code: {response.status_code}")
  29.         return None
  30.  
  31.     soup = BeautifulSoup(response.text, 'html.parser')
  32.     scripts = soup.find_all('script')
  33.  
  34.     yt_initial_data = None
  35.     for script in scripts:
  36.         if script.string and 'var ytInitialData' in script.string:
  37.             yt_initial_data = script.string
  38.             break
  39.  
  40.     if not yt_initial_data:
  41.         print("Failed to find initial data in the page.")
  42.         return None
  43.  
  44.     json_data = extract_json_from_script(yt_initial_data)
  45.     if not json_data:
  46.         print("Failed to parse the initial data JSON.")
  47.         return None
  48.  
  49.     return parse_single_post_from_json(json_data)
  50.  
  51. def extract_json_from_script(script_text):
  52.     """Extracts JSON data from a JavaScript variable assignment."""
  53.     match = re.search(r'var ytInitialData = ({.*?});', script_text, re.DOTALL)
  54.     if match:
  55.         json_text = match.group(1)
  56.         try:
  57.             return json.loads(json_text)
  58.         except json.JSONDecodeError:
  59.             print("JSON decode error.")
  60.     return None
  61.  
  62. def parse_single_post_from_json(json_data):
  63.     """Parses a single community post from JSON data."""
  64.     try:
  65.         contents = json_data['contents']['twoColumnBrowseResultsRenderer']['tabs']
  66.         for tab in contents:
  67.             if 'tabRenderer' in tab and tab['tabRenderer'].get('selected', False):
  68.                 community_tab = tab['tabRenderer']
  69.                 community_content = community_tab['content']['sectionListRenderer']['contents']
  70.                 for item in community_content:
  71.                     post = extract_post(item)
  72.                     if post:
  73.                         return post
  74.     except KeyError as e:
  75.         print(f"KeyError while parsing post: {e}")
  76.     return None
  77.  
  78. def extract_post(item):
  79.     """Extracts an individual post from the JSON item."""
  80.     try:
  81.         item_contents = item.get('itemSectionRenderer', {}).get('contents', [])
  82.         for content in item_contents:
  83.             if 'backstagePostThreadRenderer' in content:
  84.                 return content['backstagePostThreadRenderer']['post']
  85.     except KeyError as e:
  86.         print(f"KeyError while extracting post: {e}")
  87.     return None
  88.  
  89. def get_next_folder_number():
  90.     """Automatically calculates the next folder number based on existing folders."""
  91.     existing_folders = [f for f in os.listdir() if os.path.isdir(f) and re.match(r'^\d{3}', f)]
  92.     if not existing_folders:
  93.         return "001"
  94.  
  95.     folder_numbers = []
  96.     for folder in existing_folders:
  97.         match = re.match(r'^(\d{3})', folder)
  98.         if match:
  99.             folder_numbers.append(int(match.group(1)))
  100.     if not folder_numbers:
  101.         return "001"
  102.  
  103.     next_number = max(folder_numbers) + 1
  104.     return f"{next_number:03d}"
  105.  
  106. def get_original_image_url(img_url):
  107.     """Removes the resolution parameters from an image URL to get the original size."""
  108.     url_parts = urllib.parse.urlsplit(img_url)
  109.     cleaned_path = re.sub(r"=[^&]+$", "", url_parts.path)  # Remove the resolution part
  110.     return urllib.parse.urlunsplit((url_parts.scheme, url_parts.netloc, cleaned_path, '', ''))
  111.  
  112. def download_image(img_url, filename, session):
  113.     """Downloads an image from a URL and saves it to a file."""
  114.     try:
  115.         response = session.get(img_url, stream=True)
  116.         if response.status_code == 200:
  117.             with open(filename, 'wb') as f:
  118.                 for chunk in response.iter_content(1024):
  119.                     f.write(chunk)
  120.             print(f"Downloaded image: {filename}")
  121.         else:
  122.             print(f"Failed to download image {img_url}. Status code: {response.status_code}")
  123.     except Exception as e:
  124.         print(f"Error downloading image {img_url}: {e}")
  125.  
  126. def process_post(post_data, session, browser_name, post_url):
  127.     """Processes a single post and saves the required data."""
  128.     eastern = pytz.timezone('US/Eastern')
  129.  
  130.     post_renderer = post_data.get('backstagePostRenderer', {})
  131.     if not post_renderer:
  132.         print("Post renderer data not found. Skipping post.")
  133.         return
  134.  
  135.     post_id = post_renderer.get('postId', '')
  136.     if not post_id:
  137.         print("Post ID not found. Skipping post.")
  138.         return
  139.  
  140.     # Extract published time
  141.     published_time_text = post_renderer.get('publishedTimeText', {}).get('runs', [{}])[0].get('text', '')
  142.     timestamp_str = None
  143.  
  144.     if published_time_text:
  145.         published_time_text = re.sub(r'\(edited\)', '', published_time_text).strip()
  146.         timestamp_dt = dateparser.parse(published_time_text, settings={'TIMEZONE': 'UTC', 'RETURN_AS_TIMEZONE_AWARE': True})
  147.         if timestamp_dt:
  148.             timestamp_dt = timestamp_dt.astimezone(eastern)
  149.             if "year" in published_time_text.lower():
  150.                 timestamp_str = timestamp_dt.strftime('%Y')
  151.             else:
  152.                 timestamp_str = timestamp_dt.strftime('%Y-%m')
  153.         else:
  154.             print(f"Failed to parse 'publishedTimeText' for post {post_id}. Using current time as fallback.")
  155.             timestamp_dt = datetime.now(pytz.utc).astimezone(eastern)
  156.             timestamp_str = timestamp_dt.strftime('%Y-%m')
  157.     else:
  158.         print(f"No published time found for post {post_id}. Using current time as fallback.")
  159.         timestamp_dt = datetime.now(pytz.utc).astimezone(eastern)
  160.         timestamp_str = timestamp_dt.strftime('%Y-%m')
  161.  
  162.     # Extract content text
  163.     content_text = ''
  164.     content_runs = post_renderer.get('contentText', {}).get('runs', [])
  165.     for run in content_runs:
  166.         content_text += run.get('text', '')
  167.  
  168.     # Extract likes count
  169.     like_count = post_renderer.get('voteCount', {}).get('simpleText', '0')
  170.  
  171.     # Extract visibility if available
  172.     visibility = post_renderer.get('visibility', {}).get('simpleText', 'Members')
  173.  
  174.     # Truncate content for folder name
  175.     truncated_content = sanitize_for_windows(content_text[:50])
  176.  
  177.     # Get the next available folder number
  178.     folder_number = get_next_folder_number()
  179.  
  180.     # Construct folder name using the folder number and date
  181.     folder_name = f"{folder_number} {timestamp_str} - {truncated_content}"
  182.     if not os.path.exists(folder_name):
  183.         os.makedirs(folder_name)
  184.  
  185.     # Save post.txt with content and metadata
  186.     post_txt_path = os.path.join(folder_name, 'post.txt')
  187.     try:
  188.         with open(post_txt_path, 'w', encoding='utf-8') as f:
  189.             f.write(f"Date: {timestamp_str}\n")
  190.             f.write(f"URL: {post_url}\n")
  191.             f.write(f"Visibility: {visibility}\n")
  192.             f.write(f"Likes: {like_count}\n")
  193.             f.write("Content:\n")
  194.             f.write(content_text)
  195.         print(f"Post saved in: {post_txt_path}")
  196.     except Exception as e:
  197.         print(f"Error writing post.txt: {e}")
  198.  
  199.     # Extract and download images
  200.     images = []
  201.     seen_images = set()
  202.  
  203.     backstage_attachment = post_renderer.get('backstageAttachment', {})
  204.     if 'postMultiImageRenderer' in backstage_attachment:
  205.         # Handle posts with multiple images
  206.         multi_images = backstage_attachment['postMultiImageRenderer'].get('images', [])
  207.         for img_data in multi_images:
  208.             image_renderer = img_data.get('backstageImageRenderer', {})
  209.             thumbnails = image_renderer.get('image', {}).get('thumbnails', [])
  210.             for img in thumbnails:
  211.                 img_url = get_original_image_url(img.get('url', ''))
  212.                 if img_url and img_url not in seen_images:
  213.                     seen_images.add(img_url)
  214.                     images.append(img_url)
  215.     elif 'backstageImageRenderer' in backstage_attachment:
  216.         # Single image
  217.         image_renderer = backstage_attachment['backstageImageRenderer']
  218.         thumbnails = image_renderer.get('image', {}).get('thumbnails', [])
  219.         for img in thumbnails:
  220.             img_url = get_original_image_url(img.get('url', ''))
  221.             if img_url and img_url not in seen_images:
  222.                 seen_images.add(img_url)
  223.                 images.append(img_url)
  224.  
  225.     # Download images
  226.     for idx, img_url in enumerate(images, start=1):
  227.         img_extension = os.path.splitext(urllib.parse.urlsplit(img_url).path)[1] or '.webp'
  228.         img_filename = os.path.join(folder_name, f'image_{idx}{img_extension}')
  229.         download_image(img_url, img_filename, session)
  230.  
  231. def main():
  232.     if len(sys.argv) < 2:
  233.         print("Usage: python script_name.py <browser_name>")
  234.         print("Example: python script_name.py firefox")
  235.         return
  236.  
  237.     browser_name = sys.argv[1].lower()
  238.  
  239.     try:
  240.         if browser_name == 'firefox':
  241.             cj = browser_cookie3.firefox()
  242.         elif browser_name == 'chrome':
  243.             cj = browser_cookie3.chrome()
  244.         else:
  245.             print(f"Unsupported browser: {browser_name}")
  246.             return
  247.     except Exception as e:
  248.         print(f"Error extracting cookies from {browser_name}: {e}")
  249.         return
  250.  
  251.     session = requests.Session()
  252.     session.cookies = cj
  253.  
  254.     print("Enter a full YouTube community post URL or 'exit' to quit:")
  255.  
  256.     while True:
  257.         post_url = input("Full URL: ").strip()
  258.         if post_url.lower() == 'exit':
  259.             print("Exiting...")
  260.             break
  261.  
  262.         # Validate YouTube community post URL
  263.         if not re.match(r'^https?://(www\.)?youtube\.com/channel/.+/community\?lb=\w+', post_url):
  264.             print("Invalid URL. Please enter a valid YouTube community post URL.")
  265.             continue
  266.  
  267.         print(f"Processing URL: {post_url}")
  268.  
  269.         post_data = extract_single_post(post_url, session)
  270.         if not post_data:
  271.             print("No post data found or an error occurred.")
  272.             continue
  273.  
  274.         process_post(post_data, session, browser_name, post_url)
  275.         print("Post archived successfully.")
  276.         print("\nReady for the next URL. Enter 'exit' to quit.")
  277.  
  278. if __name__ == '__main__':
  279.     main()
  280.  
Advertisement
Add Comment
Please, Sign In to add comment