import argparse import csv import hashlib import json import multiprocessing as mp import os import random import re import subprocess import time import urllib.request from urllib.parse import urlparse import uuid import requests from tqdm import tqdm import yt_dlp """ Example usage: python download.py --csv_file "data.csv" --output_dir "downloads" --workers 4 """ def setup_argparse(): parser = argparse.ArgumentParser(description="Download audio from YouTube, Bilibili, or direct MP3 links.") parser.add_argument("--csv_file", help="Path to the CSV file containing download information.") parser.add_argument("--output_dir", help="Directory to save downloaded files.") parser.add_argument("--workers", type=int, default=4, help="Number of worker processes (default: 4)") return parser.parse_args() def read_csv(csv_file): with open(csv_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) return list(reader) def sanitize_filename(filename): return re.sub(r'[\\/*?:"<>|]', "", filename) def bv2av(x): table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF' tr = {table[i]: i for i in range(58)} s = [11, 10, 3, 8, 4, 6] xor = 177451812 add = 8728348608 r = 0 for i in range(6): r += tr[x[s[i]]] * (58 ** i) return (r - add) ^ xor def bv_url_to_av_url(url): def replacer(match): bv = match.group(1) av_code = bv2av(bv) return match.group(0).replace(bv, 'av' + str(av_code)) return re.sub(r'/video/(BV([a-zA-Z0-9]+))', replacer, url) def get_play_list(start_url, cid, quality): entropy = 'rbMCKn@KuamXWlPMoJGsKcbiJKUfkPF_8dABscJntvqhRSETg' appkey, sec = ''.join([chr(ord(i) + 2) for i in entropy[::-1]]).split(':') params = f'appkey={appkey}&cid={cid}&otype=json&qn={quality}&quality={quality}&type=' chksum = hashlib.md5(bytes(params + sec, 'utf8')).hexdigest() url_api = f'https://interface.bilibili.com/v2/playurl?{params}&sign={chksum}' headers = { 'Referer': start_url, 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36' } response = requests.get(url_api, headers=headers) video_list = [] if response.status_code == 200: html = response.json() video_list = [i['url'] for i in html['durl']] return video_list def download_bilibili(url, output_path): try: start_url = bv_url_to_av_url(url) p_id = re.search(r'\?p=(\d+)', start_url).group(1) if '?p=' in start_url else None aid = re.search(r'/av(\d+)/*', start_url).group(1) api_url = f'https://api.bilibili.com/x/web-interface/view?aid={aid}' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36' } response = requests.get(api_url, headers=headers) if response.status_code == 200: data = response.json()['data'] cid_list = data['pages'][int(p_id) - 1] if p_id else data['pages'][0] cid = str(cid_list['cid']) title = cid_list['part'] or data["title"].replace(" ", "_") title = sanitize_filename(title) video_list = get_play_list(start_url, cid, quality=80) if video_list: opener = urllib.request.build_opener() opener.addheaders = [ ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:56.0) Gecko/20100101 Firefox/56.0'), ('Accept', '*/*'), ('Accept-Language', 'en-US,en;q=0.5'), ('Accept-Encoding', 'gzip, deflate, br'), ('Range', 'bytes=0-'), ('Referer', start_url), ('Origin', 'https://www.bilibili.com'), ('Connection', 'keep-alive'), ] urllib.request.install_opener(opener) temp_file = get_unique_filename(output_path, "flv") urllib.request.urlretrieve(url=video_list[0], filename=temp_file) return temp_file else: raise Exception("No video URL found") else: raise Exception(f"Failed to fetch video info: HTTP {response.status_code}") except Exception as e: raise Exception(f"Bilibili download failed: {str(e)}") def get_unique_filename(directory, extension): return os.path.join(directory, f"temp_{uuid.uuid4().hex}.{extension}") def download_youtube(url, output_path): temp_file = get_unique_filename(output_path, "mp3") ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': temp_file, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return temp_file def download_mp3(url, output_path): try: response = requests.get(url) response.raise_for_status() mp3_path = get_unique_filename(output_path, "mp3") with open(mp3_path, 'wb') as f: f.write(response.content) return mp3_path except Exception as e: raise Exception(f"MP3 download failed: {str(e)}") def convert_to_flac(input_path, output_path): input_path += ".mp3" try: subprocess.run(['ffmpeg', '-i', input_path, '-c:a', 'flac', output_path], check=True) except subprocess.CalledProcessError as e: raise Exception(f"FLAC conversion failed: {str(e)}") def process_row(row, output_dir): url = row['Url'] title = sanitize_filename(row['Title']) bonafide_or_deepfake = row['Bonafide Or Deepfake'].lower() output_filename = f"{bonafide_or_deepfake}_{title}.flac" output_path = os.path.join(output_dir, output_filename) temp_file = None try: if 'youtube.com' in url or 'youtu.be' in url: temp_file = download_youtube(url, output_dir) elif 'bilibili.com' in url: temp_file = download_bilibili(url, output_dir) elif urlparse(url).path.endswith('.mp3'): temp_file = download_mp3(url, output_dir) else: raise ValueError(f"Unsupported URL: {url}") convert_to_flac(temp_file, output_path) return True, None except Exception as e: return False, str(e) finally: if temp_file and os.path.exists(temp_file): os.remove(temp_file) temp_file += ".mp3" if os.path.exists(temp_file): os.remove(temp_file) def worker(queue, output_dir, log_queue, progress_queue): while True: item = queue.get() if item is None: break index, row = item success, error = process_row(row, output_dir) log_queue.put((index, row['Url'], success, error)) progress_queue.put(1) time.sleep(random.uniform(0.5, 1.5)) def logger(log_queue, log_file): with open(log_file, 'w', encoding='utf-8') as f: while True: item = log_queue.get() if item is None: break index, url, success, error = item log_entry = f"{index},{url},{'Success' if success else 'Failure'},{error if error else ''}\n" f.write(log_entry) f.flush() def progress_tracker(progress_queue, total_rows): pbar = tqdm(total=total_rows, desc="Downloading") completed = 0 while completed < total_rows: increment = progress_queue.get() if increment is None: break completed += increment pbar.update(increment) pbar.close() def main(): args = setup_argparse() os.makedirs(args.output_dir, exist_ok=True) rows = read_csv(args.csv_file) total_rows = len(rows) task_queue = mp.Queue() log_queue = mp.Queue() progress_queue = mp.Queue() for i, row in enumerate(rows): task_queue.put((i, row)) log_file = os.path.join(args.output_dir, 'download_log.csv') log_process = mp.Process(target=logger, args=(log_queue, log_file)) log_process.start() progress_process = mp.Process(target=progress_tracker, args=(progress_queue, total_rows)) progress_process.start() workers = [] for _ in range(args.workers): p = mp.Process(target=worker, args=(task_queue, args.output_dir, log_queue, progress_queue)) workers.append(p) p.start() for _ in range(args.workers): task_queue.put(None) for w in workers: w.join() log_queue.put(None) log_process.join() progress_queue.put(None) progress_process.join() if __name__ == "__main__": mp.freeze_support() main()