Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import csv
- import hashlib
- import json
- import multiprocessing as mp
- import os
- import random
- import re
- import subprocess
- import shutil
- import time
- import urllib.request
- from urllib.parse import urlparse
- import uuid
- import requests
- from tqdm import tqdm
- import yt_dlp
- """
- Example usage: python download.py --csv_file "data.csv" --output_dir "downloads" --workers 4
- """
- def setup_argparse():
- parser = argparse.ArgumentParser(description="Download audio from YouTube, Bilibili, or direct MP3 links.")
- parser.add_argument("--csv_file", help="Path to the CSV file containing download information.")
- parser.add_argument("--output_dir", help="Directory to save downloaded files.")
- parser.add_argument("--workers", type=int, default=4, help="Number of worker processes (default: 4)")
- return parser.parse_args()
- def read_csv(csv_file):
- with open(csv_file, 'r', encoding='utf-8') as f:
- reader = csv.DictReader(f)
- return list(reader)
- def sanitize_filename(filename):
- return re.sub(r'[\\/*?:"<>|]', "", filename)
- def bv2av(x):
- table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
- tr = {table[i]: i for i in range(58)}
- s = [11, 10, 3, 8, 4, 6]
- xor = 177451812
- add = 8728348608
- r = 0
- for i in range(6):
- r += tr[x[s[i]]] * (58 ** i)
- return (r - add) ^ xor
- def bv_url_to_av_url(url):
- bv = re.search(r'/BV([0-9A-Za-z]+)', url).group(0)
- av = bv2av(bv[1:])
- new_url = url.replace(bv, f'/av{av}')
- return new_url
- import requests
- import re
- import os
- import json
- from urllib.parse import urlparse
- from tqdm import tqdm
- # Regular expressions to extract play info
- REGEX_PLAY_INFO = r'<script>window\.__playinfo__=(.*?)</script>'
- REGEX_INITIAL_STATE = r'__INITIAL_STATE__=(.*?);\(function\(\)'
- USER_AGENT = (
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
- "AppleWebKit/537.36 (KHTML, like Gecko) "
- "Chrome/90.0.4430.212 Safari/537.36"
- )
- BILIBILI_URL = "https://www.bilibili.com"
- def get_play_url_web_page(url):
- headers = {
- 'User-Agent': USER_AGENT,
- 'Referer': BILIBILI_URL,
- 'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
- 'accept-encoding': 'gzip, deflate, br'
- }
- response = requests.get(url, headers=headers)
- html = response.text
- match_initial_state = re.search(REGEX_INITIAL_STATE, html)
- match_play_info = re.search(REGEX_PLAY_INFO, html)
- if not match_initial_state or not match_play_info:
- return None, None
- initial_state = match_initial_state.group(1)
- play_info = match_play_info.group(1)
- # Parse JSON data
- initial_state_json = json.loads(initial_state)
- play_info_json = json.loads(play_info)
- return initial_state_json, play_info_json
- def parse_video(play_info):
- video_list = play_info['data']['dash']['video']
- # Find the best quality video
- best_video = max(video_list, key=lambda x: x['height'] * x['width'])
- return best_video
- def parse_audio(play_info):
- audio_list = play_info['data']['dash']['audio']
- # Choose the highest bitrate audio stream
- best_audio = max(audio_list, key=lambda x: x['bandwidth'])
- return best_audio
- def download_file(url, headers, filename):
- response = requests.get(url, headers=headers, stream=True)
- total = int(response.headers.get('content-length', 0))
- with open(filename, 'ab') as file, tqdm(
- desc=os.path.basename(filename),
- total=total,
- unit='iB',
- unit_scale=True,
- unit_divisor=1024,
- initial=file.tell(),
- ) as bar:
- for data in response.iter_content(chunk_size=1024):
- size = file.write(data)
- bar.update(size)
- def download_bilibili(bilibili_url, output_dir):
- initial_state, play_info = get_play_url_web_page(bilibili_url)
- if not initial_state or not play_info:
- print("Failed to retrieve video information.")
- return
- video_data = initial_state['videoData']
- bvid = video_data['bvid']
- title = video_data['title']
- unique_name = f"[{bvid}]{title}"
- unique_name = re.sub(r'[\\/*?:"<>|]', "", unique_name)
- best_audio = parse_audio(play_info)
- audio_url = best_audio['baseUrl']
- # Set up headers
- headers = {
- 'User-Agent': USER_AGENT,
- 'Referer': BILIBILI_URL,
- }
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- # Download audio
- audio_filename = os.path.join(output_dir, f"{unique_name}_audio.m4a")
- print(f"Downloading audio to {audio_filename}")
- download_file(audio_url, headers, audio_filename)
- return audio_filename
- def get_unique_filename(directory, extension):
- return os.path.join(directory, f"temp_{uuid.uuid4().hex}.{extension}")
- def download_youtube(url, output_path):
- # Process URL and extract the video ID
- parsed_url = urlparse(url)
- video_id = parsed_url.query.split('v=')[-1][:11]
- url_constructed = f"https://www.youtube.com/watch?v={video_id}"
- ydl_opts = {
- 'format': 'bestaudio/best',
- 'postprocessors': [{
- 'key': 'FFmpegExtractAudio',
- 'preferredcodec': 'flac',
- }],
- 'outtmpl': output_path,
- 'socket_timeout': 300, # 5 minutes timeout
- 'noplaylist': True, # Ensure only the video is downloaded, not a playlist
- 'extract_flat': True, # Don't extract more info than necessary
- }
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
- ydl.download([url_constructed])
- return output_path
- def download_mp3(url, output_path):
- try:
- response = requests.get(url)
- response.raise_for_status()
- mp3_path = get_unique_filename(os.path.dirname(output_path), "mp3")
- with open(mp3_path, 'wb') as f:
- f.write(response.content)
- return mp3_path
- except Exception as e:
- raise Exception(f"MP3 download failed: {str(e)}")
- def convert_to_flac(input_path, output_path):
- try:
- subprocess.run(['ffmpeg', '-y', '-i', input_path, '-c:a', 'flac', output_path], check=True)
- except subprocess.CalledProcessError as e:
- raise Exception(f"FLAC conversion failed: {str(e)}")
- def process_row(row, output_dir):
- url = row['Url']
- title = sanitize_filename(row['Title'])
- bonafide_or_deepfake = row['Bonafide Or Deepfake'].lower()
- output_filename = f"{bonafide_or_deepfake}_{title}.flac"
- output_path = os.path.join(output_dir, output_filename)
- # if the output_path has been downloaded, then skip.
- if os.path.exists(output_path + '.flac'):
- return True, None
- temp_file = None
- try:
- if 'youtube.com' in url or 'youtu.be' in url:
- download_youtube(url, output_path)
- elif 'bilibili.com' in url:
- temp_file = download_bilibili(url, output_dir)
- convert_to_flac(temp_file, output_path)
- elif urlparse(url).path.endswith('.mp3'):
- temp_file = download_mp3(url, output_dir)
- convert_to_flac(temp_file, output_path)
- else:
- raise ValueError(f"Unsupported URL: {url}")
- return True, None
- except Exception as e:
- return False, str(e)
- finally:
- if temp_file and os.path.exists(temp_file):
- os.remove(temp_file)
- def worker(queue, output_dir, log_queue, progress_queue):
- while True:
- item = queue.get()
- if item is None:
- break
- index, row = item
- success, error = process_row(row, output_dir)
- log_queue.put((index, row['Url'], success, error))
- progress_queue.put(1)
- time.sleep(random.uniform(1, 2.5))
- def logger(log_queue, log_file):
- with open(log_file, 'w', encoding='utf-8') as f:
- while True:
- item = log_queue.get()
- if item is None:
- break
- index, url, success, error = item
- log_entry = f"{index},{url},{'Success' if success else 'Failure'},{error if error else ''}\n"
- f.write(log_entry)
- f.flush()
- def progress_tracker(progress_queue, total_rows):
- pbar = tqdm(total=total_rows, desc="Downloading")
- completed = 0
- while completed < total_rows:
- increment = progress_queue.get()
- if increment is None:
- break
- completed += increment
- pbar.update(increment)
- pbar.close()
- def main():
- args = setup_argparse()
- os.makedirs(args.output_dir, exist_ok=True)
- rows = read_csv(args.csv_file)
- total_rows = len(rows)
- task_queue = mp.Queue()
- log_queue = mp.Queue()
- progress_queue = mp.Queue()
- for i, row in enumerate(rows):
- task_queue.put((i, row))
- log_file = os.path.join(args.output_dir, 'download_log.csv')
- log_process = mp.Process(target=logger, args=(log_queue, log_file))
- log_process.start()
- progress_process = mp.Process(target=progress_tracker, args=(progress_queue, total_rows))
- progress_process.start()
- workers = []
- for _ in range(args.workers):
- p = mp.Process(target=worker, args=(task_queue, args.output_dir, log_queue, progress_queue))
- workers.append(p)
- p.start()
- for _ in range(args.workers):
- task_queue.put(None)
- for w in workers:
- w.join()
- log_queue.put(None)
- log_process.join()
- progress_queue.put(None)
- progress_process.join()
- if __name__ == "__main__":
- mp.freeze_support()
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement