Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import subprocess
- import glob
- import whisperx
- from tqdm import tqdm
- def has_subtitles(video_path):
- try:
- result = subprocess.check_output([
- 'ffprobe', '-v', 'error', '-select_streams', 's',
- '-show_entries', 'stream=index', '-of', 'csv=p=0', video_path
- ])
- return bool(result.strip())
- except:
- return False
- def extract_audio(video_path, audio_path):
- subprocess.run([
- 'ffmpeg', '-y', '-i', video_path, '-vn',
- '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', audio_path
- ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- def format_time(seconds):
- millisec = int((seconds - int(seconds)) * 1000)
- return f"{int(seconds // 3600):02}:{int((seconds % 3600) // 60):02}:{int(seconds % 60):02},{millisec:03}"
- def convert_language_code(lang):
- mapping = {
- "ru": "rus",
- "en": "eng",
- }
- return mapping.get(lang.lower()) if lang else None
- def transcribe_audio(audio_path):
- device = "cuda"
- batch_size = 16
- compute_type = "float16"
- model = whisperx.load_model("large-v3", device, compute_type=compute_type)
- audio = whisperx.load_audio(audio_path)
- result = model.transcribe(audio, batch_size=batch_size)
- original_language = result["language"]
- metadata_language = convert_language_code(original_language)
- if metadata_language is None:
- print(f"Язык {original_language} не поддерживается. Пропуск файла.")
- return None, None, None
- try:
- model_a, metadata = whisperx.load_align_model(language_code=original_language, device=device)
- except ValueError:
- print(f"Нет модели выравнивания для языка: {original_language}. Использование без выравнивания.")
- model_a, metadata = None, None
- result = whisperx.align(result["segments"], model_a, metadata, audio, device,
- return_char_alignments=False) if model_a else result
- diarize_model = whisperx.DiarizationPipeline(
- use_auth_token="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", device=device
- )
- diarize_segments = diarize_model(audio)
- result = whisperx.assign_word_speakers(diarize_segments, result)
- srt_lines = []
- text_lines = []
- index = 1
- for segment in result["segments"]:
- speaker = segment.get("speaker", "Unknown")
- start_time = format_time(segment['start'])
- end_time = format_time(segment['end'])
- words = " ".join([word["word"] for word in segment["words"]])
- srt_lines.append(f"{index}")
- srt_lines.append(f"{start_time} --> {end_time}")
- srt_lines.append(f"{speaker}: {words}")
- srt_lines.append("")
- text_lines.append(f"`{start_time}-{end_time}` **{speaker}**: {words}")
- index += 1
- return "\n".join(srt_lines), "\n".join(text_lines), metadata_language
- def process_pipeline(video_path, output_base):
- base_name = os.path.splitext(os.path.basename(video_path))[0]
- temp_dir = os.path.join(os.getcwd(), "temp")
- os.makedirs(temp_dir, exist_ok=True)
- output_dir = os.path.join(output_base, "outputs")
- os.makedirs(output_dir, exist_ok=True)
- temp_audio = os.path.join(temp_dir, f"{base_name}.wav")
- extract_audio(video_path, temp_audio)
- if not os.path.exists(temp_audio):
- print(f"Ошибка: аудиофайл {temp_audio} не был создан. Пропуск файла.")
- return
- srt_content, text_content, lang = transcribe_audio(temp_audio)
- if srt_content is None:
- print(f"Пропущено {video_path} из-за неподдерживаемого языка.")
- if os.path.exists(temp_audio):
- os.remove(temp_audio)
- return
- output_srt = os.path.join(output_dir, f"{base_name}.srt")
- with open(output_srt, "w", encoding="utf-8") as f:
- f.write(srt_content)
- output_text = os.path.join(output_dir, f"{base_name}.txt")
- with open(output_text, "w", encoding="utf-8") as f:
- f.write(text_content)
- output_video = os.path.join(output_dir, f"{base_name}.mp4")
- burn_subtitles(video_path, output_srt, output_video, lang)
- if os.path.exists(temp_audio):
- os.remove(temp_audio)
- # if os.path.exists(output_srt): # удаление .srt
- # os.remove(output_srt)
- print(f"Обработано: {video_path} (Язык: {lang})")
- def burn_subtitles(video_path, srt_path, output_video_path, language):
- srt_path_filter = srt_path.replace("\\", "/")
- command = [
- 'ffmpeg', '-y',
- '-i', video_path,
- '-i', srt_path_filter,
- '-c:v', 'hevc_nvenc', '-cq', '37',
- '-c:a', 'copy',
- '-c:s', 'mov_text'
- ]
- mapped_language = convert_language_code(language)
- if mapped_language:
- command.extend(['-metadata:s:s:0', f'language={mapped_language}'])
- command.append(output_video_path)
- subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
- def process_folder(folder_path, output_base):
- video_extensions = ["*.mp4", "*.mkv", "*.avi", "*.mov", "*.webm"]
- video_files = []
- for ext in tqdm(video_extensions, desc="Поиск видеофайлов"):
- video_files.extend(glob.glob(os.path.join(folder_path, ext)))
- output_dir = os.path.join(output_base, "outputs")
- os.makedirs(output_dir, exist_ok=True)
- files_to_process = []
- for video in tqdm(video_files, desc="Проверка наличия субтитров и выходных файлов"):
- base_name = os.path.splitext(os.path.basename(video))[0]
- output_video_path = os.path.join(output_dir, f"{base_name}.mp4")
- if os.path.exists(output_video_path):
- # print(f"\nПропущено: {video} (уже обработано)\n")
- continue
- if not has_subtitles(video):
- files_to_process.append(video)
- for video in tqdm(files_to_process, desc="Обработка видеофайлов"):
- process_pipeline(video, output_base)
- if __name__ == "__main__":
- INPUT_FOLDER = r"C:\Users\User\Videos"
- OUTPUT_BASE = r"C:\Users\User\Videos"
- process_folder(INPUT_FOLDER, OUTPUT_BASE)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement