import requests from parsel import Selector import pandas as pd from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor import os import http.cookiejar from itertools import repeat import csv import re from datetime import datetime import time # Set user agent headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36' } # Function to extract information from URL def extract_info(url, cookies): try: # Use cookies for all requests response = requests.get(url, headers=headers, cookies=cookies) if response.status_code == 200: selector = Selector(response.text) # Extract upload date directly from JSON-LD script upload_date = selector.xpath('//script[@type="application/ld+json"]/text()').re_first(r'"uploadDate": "([^"]+)"') duration_str = selector.xpath('//script[@type="application/ld+json"]/text()').re_first(r'"duration": "([^"]+)"') # Extract other information from the page title = selector.css('h1::text').get() description = selector.css('#descript-cont::text').get() # Extract description views_str = selector.css('.video-stat svg.views+span::text').get() # Extract views comments_str = selector.css('.comments-label .num::text').get() channel_name = selector.css('.channel-name span::text').get() channel_path = selector.css('.channel-name::attr(href)').get() channel_url = f"https://www.vbox7.com{channel_path}" if channel_path else None # Preprocess thumbnail URL to calculate it video_id = url.split(':')[-1] # Extract video ID from URL thumbnail = f"https://i49.vbox7.com/o/{video_id[:3]}/{video_id}0.jpg" # Calculate thumbnail URL # Strip white lines and whitespace characters description = ' '.join(description.strip().split()) if description else None channel_name = channel_name.strip() if channel_name else None # Remove whitespace characters and convert views and comments to integers views = int(views_str.replace(' ', '')) if views_str else None comments = int(comments_str.replace(' ', '')) if comments_str else None # Return data as tuple return (url, thumbnail, title, description, duration_str, upload_date, views, comments, channel_name, channel_url) else: return None except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return None # Main function def main(): # Prompt user for the path to the TXT file txt_file_path = input("Please enter the path to the TXT file: ") # Prompt user for chunk size and calculate the number of ranges chunk_size = int(input("Enter the chunk size: ")) total_lines = sum(1 for line in open(txt_file_path)) num_ranges = total_lines // chunk_size print(f"Total lines: {total_lines}, Chunk size: {chunk_size}, Number of ranges: {num_ranges}") # Prompt user to select a specific range range_selection = int(input(f"Enter the range number (1 - {num_ranges}): ")) start_index = (range_selection - 1) * chunk_size end_index = min(range_selection * chunk_size, total_lines) print(f"Selected range: {range_selection}, Start index: {start_index}, End index: {end_index}") # Output CSV file name based on input TXT file name and selected range output_file = os.path.splitext(txt_file_path)[0] + f'_range{range_selection}_videos.csv' # Check if cookies.txt exists in the current folder if os.path.exists("cookies.txt"): # Load the cookies from the file using http.cookiejar cookie_jar = http.cookiejar.MozillaCookieJar("cookies.txt") cookie_jar.load(ignore_discard=True, ignore_expires=True) cookies = requests.utils.dict_from_cookiejar(cookie_jar) else: # No cookies found, use an empty cookie jar cookies = requests.cookies.RequestsCookieJar() # Write header row to CSV header_row = ['url', 'thumbnail', 'title', 'description', 'duration', 'upload_date', 'views', 'comments', 'channel_name', 'channel_url'] pd.DataFrame([header_row]).to_csv(output_file, index=False, header=False) # Read IDs from TXT file in the selected range and prepend URL prefix with open(txt_file_path, 'r') as file: ids = [f"https://www.vbox7.com/play:{line.strip()}" for line in file.readlines()[start_index:end_index]] # Initialize progress bar with tqdm(total=len(ids), desc="Processing IDs") as pbar: # Use ThreadPoolExecutor for multithreading with ThreadPoolExecutor(max_workers=5) as executor: # Extract information for each ID for data in executor.map(extract_info, ids, repeat(cookies)): try: if data is not None: # Check if data is not None # Write data to CSV using csv module to ensure quoting with open(output_file, 'a', newline='', encoding='utf-8') as csvfile: csvwriter = csv.writer(csvfile, quoting=csv.QUOTE_ALL) csvwriter.writerow(data) else: print("Error: Data is None") # Log the last unprocessed ID last_unprocessed_id = ids[pbar.n] print(f"Last unprocessed ID: {last_unprocessed_id}") # Set a 1-minute timer before continuing print("Waiting for 1 minute before continuing...") time.sleep(60) # Continue to the next iteration continue except requests.exceptions.SSLError as e: print(f"SSLError occurred: {e}") # Log the last unprocessed ID last_unprocessed_id = ids[pbar.n] print(f"Last unprocessed ID: {last_unprocessed_id}") # Set a 1-minute timer before continuing print("Waiting for 1 minute before continuing...") time.sleep(60) # Continue to the next iteration continue # Update progress bar pbar.update(1) if __name__ == "__main__": main()