Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import argparse
- import csv
- import hashlib
- import json
- import multiprocessing as mp
- import os
- import random
- import re
- import subprocess
- import time
- import urllib.request
- from urllib.parse import urlparse
- import uuid
- import requests
- from tqdm import tqdm
- import yt_dlp
- """
- Example usage: python download.py --csv_file "data.csv" --output_dir "downloads" --workers 4
- """
- def setup_argparse():
- parser = argparse.ArgumentParser(description="Download audio from YouTube, Bilibili, or direct MP3 links.")
- parser.add_argument("--csv_file", help="Path to the CSV file containing download information.")
- parser.add_argument("--output_dir", help="Directory to save downloaded files.")
- parser.add_argument("--workers", type=int, default=4, help="Number of worker processes (default: 4)")
- return parser.parse_args()
- def read_csv(csv_file):
- with open(csv_file, 'r', encoding='utf-8') as f:
- reader = csv.DictReader(f)
- return list(reader)
- def sanitize_filename(filename):
- return re.sub(r'[\\/*?:"<>|]', "", filename)
- def bv2av(x):
- table = 'fZodR9XQDSUm21yCkr6zBqiveYah8bt4xsWpHnJE7jL5VG3guMTKNPAwcF'
- tr = {table[i]: i for i in range(58)}
- s = [11, 10, 3, 8, 4, 6]
- xor = 177451812
- add = 8728348608
- r = 0
- for i in range(6):
- r += tr[x[s[i]]] * (58 ** i)
- return (r - add) ^ xor
- def bv_url_to_av_url(url):
- def replacer(match):
- bv = match.group(1)
- av_code = bv2av(bv)
- return match.group(0).replace(bv, 'av' + str(av_code))
- return re.sub(r'/video/(BV([a-zA-Z0-9]+))', replacer, url)
- def get_play_list(start_url, cid, quality):
- entropy = 'rbMCKn@KuamXWlPMoJGsKcbiJKUfkPF_8dABscJntvqhRSETg'
- appkey, sec = ''.join([chr(ord(i) + 2) for i in entropy[::-1]]).split(':')
- params = f'appkey={appkey}&cid={cid}&otype=json&qn={quality}&quality={quality}&type='
- chksum = hashlib.md5(bytes(params + sec, 'utf8')).hexdigest()
- url_api = f'https://interface.bilibili.com/v2/playurl?{params}&sign={chksum}'
- headers = {
- 'Referer': start_url,
- 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36'
- }
- response = requests.get(url_api, headers=headers)
- video_list = []
- if response.status_code == 200:
- html = response.json()
- video_list = [i['url'] for i in html['durl']]
- return video_list
- def download_bilibili(url, output_path):
- try:
- start_url = bv_url_to_av_url(url)
- p_id = re.search(r'\?p=(\d+)', start_url).group(1) if '?p=' in start_url else None
- aid = re.search(r'/av(\d+)/*', start_url).group(1)
- api_url = f'https://api.bilibili.com/x/web-interface/view?aid={aid}'
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36'
- }
- response = requests.get(api_url, headers=headers)
- if response.status_code == 200:
- data = response.json()['data']
- cid_list = data['pages'][int(p_id) - 1] if p_id else data['pages'][0]
- cid = str(cid_list['cid'])
- title = cid_list['part'] or data["title"].replace(" ", "_")
- title = sanitize_filename(title)
- video_list = get_play_list(start_url, cid, quality=80)
- if video_list:
- opener = urllib.request.build_opener()
- opener.addheaders = [
- ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.13; rv:56.0) Gecko/20100101 Firefox/56.0'),
- ('Accept', '*/*'),
- ('Accept-Language', 'en-US,en;q=0.5'),
- ('Accept-Encoding', 'gzip, deflate, br'),
- ('Range', 'bytes=0-'),
- ('Referer', start_url),
- ('Origin', 'https://www.bilibili.com'),
- ('Connection', 'keep-alive'),
- ]
- urllib.request.install_opener(opener)
- temp_file = get_unique_filename(output_path, "flv")
- urllib.request.urlretrieve(url=video_list[0], filename=temp_file)
- return temp_file
- else:
- raise Exception("No video URL found")
- else:
- raise Exception(f"Failed to fetch video info: HTTP {response.status_code}")
- except Exception as e:
- raise Exception(f"Bilibili download failed: {str(e)}")
- def get_unique_filename(directory, extension):
- return os.path.join(directory, f"temp_{uuid.uuid4().hex}.{extension}")
- def download_youtube(url, output_path):
- temp_file = get_unique_filename(output_path, "mp3")
- ydl_opts = {
- 'format': 'bestaudio/best',
- 'postprocessors': [{
- 'key': 'FFmpegExtractAudio',
- 'preferredcodec': 'mp3',
- 'preferredquality': '192',
- }],
- 'outtmpl': temp_file,
- }
- with yt_dlp.YoutubeDL(ydl_opts) as ydl:
- ydl.download([url])
- return temp_file
- def download_mp3(url, output_path):
- try:
- response = requests.get(url)
- response.raise_for_status()
- mp3_path = get_unique_filename(output_path, "mp3")
- with open(mp3_path, 'wb') as f:
- f.write(response.content)
- return mp3_path
- except Exception as e:
- raise Exception(f"MP3 download failed: {str(e)}")
- def convert_to_flac(input_path, output_path):
- input_path += ".mp3"
- try:
- subprocess.run(['ffmpeg', '-i', input_path, '-c:a', 'flac', output_path], check=True)
- except subprocess.CalledProcessError as e:
- raise Exception(f"FLAC conversion failed: {str(e)}")
- def process_row(row, output_dir):
- url = row['Url']
- title = sanitize_filename(row['Title'])
- bonafide_or_deepfake = row['Bonafide Or Deepfake'].lower()
- output_filename = f"{bonafide_or_deepfake}_{title}.flac"
- output_path = os.path.join(output_dir, output_filename)
- temp_file = None
- try:
- if 'youtube.com' in url or 'youtu.be' in url:
- temp_file = download_youtube(url, output_dir)
- elif 'bilibili.com' in url:
- temp_file = download_bilibili(url, output_dir)
- elif urlparse(url).path.endswith('.mp3'):
- temp_file = download_mp3(url, output_dir)
- else:
- raise ValueError(f"Unsupported URL: {url}")
- convert_to_flac(temp_file, output_path)
- return True, None
- except Exception as e:
- return False, str(e)
- finally:
- if temp_file and os.path.exists(temp_file):
- os.remove(temp_file)
- temp_file += ".mp3"
- if os.path.exists(temp_file):
- os.remove(temp_file)
- def worker(queue, output_dir, log_queue, progress_queue):
- while True:
- item = queue.get()
- if item is None:
- break
- index, row = item
- success, error = process_row(row, output_dir)
- log_queue.put((index, row['Url'], success, error))
- progress_queue.put(1)
- time.sleep(random.uniform(0.5, 1.5))
- def logger(log_queue, log_file):
- with open(log_file, 'w', encoding='utf-8') as f:
- while True:
- item = log_queue.get()
- if item is None:
- break
- index, url, success, error = item
- log_entry = f"{index},{url},{'Success' if success else 'Failure'},{error if error else ''}\n"
- f.write(log_entry)
- f.flush()
- def progress_tracker(progress_queue, total_rows):
- pbar = tqdm(total=total_rows, desc="Downloading")
- completed = 0
- while completed < total_rows:
- increment = progress_queue.get()
- if increment is None:
- break
- completed += increment
- pbar.update(increment)
- pbar.close()
- def main():
- args = setup_argparse()
- os.makedirs(args.output_dir, exist_ok=True)
- rows = read_csv(args.csv_file)
- total_rows = len(rows)
- task_queue = mp.Queue()
- log_queue = mp.Queue()
- progress_queue = mp.Queue()
- for i, row in enumerate(rows):
- task_queue.put((i, row))
- log_file = os.path.join(args.output_dir, 'download_log.csv')
- log_process = mp.Process(target=logger, args=(log_queue, log_file))
- log_process.start()
- progress_process = mp.Process(target=progress_tracker, args=(progress_queue, total_rows))
- progress_process.start()
- workers = []
- for _ in range(args.workers):
- p = mp.Process(target=worker, args=(task_queue, args.output_dir, log_queue, progress_queue))
- workers.append(p)
- p.start()
- for _ in range(args.workers):
- task_queue.put(None)
- for w in workers:
- w.join()
- log_queue.put(None)
- log_process.join()
- progress_queue.put(None)
- progress_process.join()
- if __name__ == "__main__":
- mp.freeze_support()
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement