Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- from parsel import Selector
- import pandas as pd
- from tqdm import tqdm
- from concurrent.futures import ThreadPoolExecutor
- import os
- import http.cookiejar
- from itertools import repeat
- import csv
- import re
- from datetime import datetime
- import time
- # Set user agent
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36'
- }
- # Function to extract information from URL
- def extract_info(url, cookies):
- try:
- # Use cookies for all requests
- response = requests.get(url, headers=headers, cookies=cookies)
- if response.status_code == 200:
- selector = Selector(response.text)
- # Extract upload date directly from JSON-LD script
- upload_date = selector.xpath('//script[@type="application/ld+json"]/text()').re_first(r'"uploadDate": "([^"]+)"')
- duration_str = selector.xpath('//script[@type="application/ld+json"]/text()').re_first(r'"duration": "([^"]+)"')
- # Extract other information from the page
- title = selector.css('h1::text').get()
- description = selector.css('#descript-cont::text').get() # Extract description
- views_str = selector.css('.video-stat svg.views+span::text').get() # Extract views
- comments_str = selector.css('.comments-label .num::text').get()
- channel_name = selector.css('.channel-name span::text').get()
- channel_path = selector.css('.channel-name::attr(href)').get()
- channel_url = f"https://www.vbox7.com{channel_path}" if channel_path else None
- # Preprocess thumbnail URL to calculate it
- video_id = url.split(':')[-1] # Extract video ID from URL
- thumbnail = f"https://i49.vbox7.com/o/{video_id[:3]}/{video_id}0.jpg" # Calculate thumbnail URL
- # Strip white lines and whitespace characters
- description = ' '.join(description.strip().split()) if description else None
- channel_name = channel_name.strip() if channel_name else None
- # Remove whitespace characters and convert views and comments to integers
- views = int(views_str.replace(' ', '')) if views_str else None
- comments = int(comments_str.replace(' ', '')) if comments_str else None
- # Return data as tuple
- return (url, thumbnail, title, description, duration_str, upload_date, views, comments, channel_name, channel_url)
- else:
- return None
- except requests.exceptions.RequestException as e:
- print(f"Error occurred: {e}")
- return None
- # Main function
- def main():
- # Prompt user for the path to the TXT file
- txt_file_path = input("Please enter the path to the TXT file: ")
- # Prompt user for chunk size and calculate the number of ranges
- chunk_size = int(input("Enter the chunk size: "))
- total_lines = sum(1 for line in open(txt_file_path))
- num_ranges = total_lines // chunk_size
- print(f"Total lines: {total_lines}, Chunk size: {chunk_size}, Number of ranges: {num_ranges}")
- # Prompt user to select a specific range
- range_selection = int(input(f"Enter the range number (1 - {num_ranges}): "))
- start_index = (range_selection - 1) * chunk_size
- end_index = min(range_selection * chunk_size, total_lines)
- print(f"Selected range: {range_selection}, Start index: {start_index}, End index: {end_index}")
- # Output CSV file name based on input TXT file name and selected range
- output_file = os.path.splitext(txt_file_path)[0] + f'_range{range_selection}_videos.csv'
- # Check if cookies.txt exists in the current folder
- if os.path.exists("cookies.txt"):
- # Load the cookies from the file using http.cookiejar
- cookie_jar = http.cookiejar.MozillaCookieJar("cookies.txt")
- cookie_jar.load(ignore_discard=True, ignore_expires=True)
- cookies = requests.utils.dict_from_cookiejar(cookie_jar)
- else:
- # No cookies found, use an empty cookie jar
- cookies = requests.cookies.RequestsCookieJar()
- # Write header row to CSV
- header_row = ['url', 'thumbnail', 'title', 'description', 'duration', 'upload_date', 'views', 'comments', 'channel_name', 'channel_url']
- pd.DataFrame([header_row]).to_csv(output_file, index=False, header=False)
- # Read IDs from TXT file in the selected range and prepend URL prefix
- with open(txt_file_path, 'r') as file:
- ids = [f"https://www.vbox7.com/play:{line.strip()}" for line in file.readlines()[start_index:end_index]]
- # Initialize progress bar
- with tqdm(total=len(ids), desc="Processing IDs") as pbar:
- # Use ThreadPoolExecutor for multithreading
- with ThreadPoolExecutor(max_workers=5) as executor:
- # Extract information for each ID
- for data in executor.map(extract_info, ids, repeat(cookies)):
- try:
- if data is not None: # Check if data is not None
- # Write data to CSV using csv module to ensure quoting
- with open(output_file, 'a', newline='', encoding='utf-8') as csvfile:
- csvwriter = csv.writer(csvfile, quoting=csv.QUOTE_ALL)
- csvwriter.writerow(data)
- else:
- print("Error: Data is None")
- # Log the last unprocessed ID
- last_unprocessed_id = ids[pbar.n]
- print(f"Last unprocessed ID: {last_unprocessed_id}")
- # Set a 1-minute timer before continuing
- print("Waiting for 1 minute before continuing...")
- time.sleep(60)
- # Continue to the next iteration
- continue
- except requests.exceptions.SSLError as e:
- print(f"SSLError occurred: {e}")
- # Log the last unprocessed ID
- last_unprocessed_id = ids[pbar.n]
- print(f"Last unprocessed ID: {last_unprocessed_id}")
- # Set a 1-minute timer before continuing
- print("Waiting for 1 minute before continuing...")
- time.sleep(60)
- # Continue to the next iteration
- continue
- # Update progress bar
- pbar.update(1)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement