Advertisement
PyK0T1K

VideoToText

Mar 27th, 2025
251
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.31 KB | None | 0 0
  1. import os
  2. import subprocess
  3. import glob
  4. import whisperx
  5. from tqdm import tqdm
  6.  
  7.  
  8. def has_subtitles(video_path):
  9.     try:
  10.         result = subprocess.check_output([
  11.             'ffprobe', '-v', 'error', '-select_streams', 's',
  12.             '-show_entries', 'stream=index', '-of', 'csv=p=0', video_path
  13.         ])
  14.         return bool(result.strip())
  15.     except:
  16.         return False
  17.  
  18.  
  19. def extract_audio(video_path, audio_path):
  20.     subprocess.run([
  21.         'ffmpeg', '-y', '-i', video_path, '-vn',
  22.         '-acodec', 'pcm_s16le', '-ar', '44100', '-ac', '2', audio_path
  23.     ], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  24.  
  25.  
  26. def format_time(seconds):
  27.     millisec = int((seconds - int(seconds)) * 1000)
  28.     return f"{int(seconds // 3600):02}:{int((seconds % 3600) // 60):02}:{int(seconds % 60):02},{millisec:03}"
  29.  
  30.  
  31. def convert_language_code(lang):
  32.     mapping = {
  33.         "ru": "rus",
  34.         "en": "eng",
  35.     }
  36.     return mapping.get(lang.lower()) if lang else None
  37.  
  38.  
  39. def transcribe_audio(audio_path):
  40.     device = "cuda"
  41.     batch_size = 16
  42.     compute_type = "float16"
  43.     model = whisperx.load_model("large-v3", device, compute_type=compute_type)
  44.     audio = whisperx.load_audio(audio_path)
  45.     result = model.transcribe(audio, batch_size=batch_size)
  46.  
  47.     original_language = result["language"]
  48.     metadata_language = convert_language_code(original_language)
  49.  
  50.     if metadata_language is None:
  51.         print(f"Язык {original_language} не поддерживается. Пропуск файла.")
  52.         return None, None, None
  53.  
  54.     try:
  55.         model_a, metadata = whisperx.load_align_model(language_code=original_language, device=device)
  56.     except ValueError:
  57.         print(f"Нет модели выравнивания для языка: {original_language}. Использование без выравнивания.")
  58.         model_a, metadata = None, None
  59.  
  60.     result = whisperx.align(result["segments"], model_a, metadata, audio, device,
  61.                             return_char_alignments=False) if model_a else result
  62.  
  63.     diarize_model = whisperx.DiarizationPipeline(
  64.         use_auth_token="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", device=device
  65.     )
  66.     diarize_segments = diarize_model(audio)
  67.     result = whisperx.assign_word_speakers(diarize_segments, result)
  68.  
  69.     srt_lines = []
  70.     text_lines = []
  71.     index = 1
  72.  
  73.     for segment in result["segments"]:
  74.         speaker = segment.get("speaker", "Unknown")
  75.         start_time = format_time(segment['start'])
  76.         end_time = format_time(segment['end'])
  77.         words = " ".join([word["word"] for word in segment["words"]])
  78.         srt_lines.append(f"{index}")
  79.         srt_lines.append(f"{start_time} --> {end_time}")
  80.         srt_lines.append(f"{speaker}: {words}")
  81.         srt_lines.append("")
  82.         text_lines.append(f"`{start_time}-{end_time}` **{speaker}**: {words}")
  83.         index += 1
  84.  
  85.     return "\n".join(srt_lines), "\n".join(text_lines), metadata_language
  86.  
  87.  
  88. def process_pipeline(video_path, output_base):
  89.     base_name = os.path.splitext(os.path.basename(video_path))[0]
  90.  
  91.     temp_dir = os.path.join(os.getcwd(), "temp")
  92.     os.makedirs(temp_dir, exist_ok=True)
  93.  
  94.     output_dir = os.path.join(output_base, "outputs")
  95.     os.makedirs(output_dir, exist_ok=True)
  96.  
  97.     temp_audio = os.path.join(temp_dir, f"{base_name}.wav")
  98.     extract_audio(video_path, temp_audio)
  99.  
  100.     if not os.path.exists(temp_audio):
  101.         print(f"Ошибка: аудиофайл {temp_audio} не был создан. Пропуск файла.")
  102.         return
  103.  
  104.     srt_content, text_content, lang = transcribe_audio(temp_audio)
  105.  
  106.     if srt_content is None:
  107.         print(f"Пропущено {video_path} из-за неподдерживаемого языка.")
  108.         if os.path.exists(temp_audio):
  109.             os.remove(temp_audio)
  110.         return
  111.  
  112.     output_srt = os.path.join(output_dir, f"{base_name}.srt")
  113.     with open(output_srt, "w", encoding="utf-8") as f:
  114.         f.write(srt_content)
  115.  
  116.     output_text = os.path.join(output_dir, f"{base_name}.txt")
  117.     with open(output_text, "w", encoding="utf-8") as f:
  118.         f.write(text_content)
  119.  
  120.     output_video = os.path.join(output_dir, f"{base_name}.mp4")
  121.     burn_subtitles(video_path, output_srt, output_video, lang)
  122.  
  123.     if os.path.exists(temp_audio):
  124.         os.remove(temp_audio)
  125.  
  126.     # if os.path.exists(output_srt):  # удаление .srt
  127.     #     os.remove(output_srt)
  128.  
  129.     print(f"Обработано: {video_path} (Язык: {lang})")
  130.  
  131.  
  132. def burn_subtitles(video_path, srt_path, output_video_path, language):
  133.     srt_path_filter = srt_path.replace("\\", "/")
  134.     command = [
  135.         'ffmpeg', '-y',
  136.         '-i', video_path,
  137.         '-i', srt_path_filter,
  138.         '-c:v', 'hevc_nvenc', '-cq', '37',
  139.         '-c:a', 'copy',
  140.         '-c:s', 'mov_text'
  141.     ]
  142.  
  143.     mapped_language = convert_language_code(language)
  144.     if mapped_language:
  145.         command.extend(['-metadata:s:s:0', f'language={mapped_language}'])
  146.  
  147.     command.append(output_video_path)
  148.  
  149.     subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
  150.  
  151.  
  152. def process_folder(folder_path, output_base):
  153.     video_extensions = ["*.mp4", "*.mkv", "*.avi", "*.mov", "*.webm"]
  154.     video_files = []
  155.  
  156.     for ext in tqdm(video_extensions, desc="Поиск видеофайлов"):
  157.         video_files.extend(glob.glob(os.path.join(folder_path, ext)))
  158.  
  159.     output_dir = os.path.join(output_base, "outputs")
  160.     os.makedirs(output_dir, exist_ok=True)
  161.  
  162.     files_to_process = []
  163.     for video in tqdm(video_files, desc="Проверка наличия субтитров и выходных файлов"):
  164.         base_name = os.path.splitext(os.path.basename(video))[0]
  165.         output_video_path = os.path.join(output_dir, f"{base_name}.mp4")
  166.  
  167.         if os.path.exists(output_video_path):
  168.             # print(f"\nПропущено: {video} (уже обработано)\n")
  169.             continue
  170.  
  171.         if not has_subtitles(video):
  172.             files_to_process.append(video)
  173.  
  174.     for video in tqdm(files_to_process, desc="Обработка видеофайлов"):
  175.         process_pipeline(video, output_base)
  176.  
  177.  
  178. if __name__ == "__main__":
  179.     INPUT_FOLDER = r"C:\Users\User\Videos"
  180.     OUTPUT_BASE = r"C:\Users\User\Videos"
  181.     process_folder(INPUT_FOLDER, OUTPUT_BASE)
  182.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement