Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from openai import OpenAI
- import os
- import subprocess
- import argparse
- import chardet
- import re
- def detect_encoding(file_path):
- with open(file_path, 'rb') as file:
- raw_data = file.read()
- result = chardet.detect(raw_data)
- return result['encoding']
- def read_srt(file_path):
- encoding = detect_encoding(file_path)
- with open(file_path, 'r', encoding=encoding) as file:
- return file.readlines()
- def clean_srt_lines(srt_lines):
- cleaned_lines = []
- for line in srt_lines:
- if line.strip() and not line.strip().isdigit():
- cleaned_lines.append(line)
- return cleaned_lines
- def extract_relevant_parts(srt_lines, keywords=None, lines_before=4, lines_after=100):
- if keywords is None:
- keywords = ["this episode","sponsor", "promotion", "brought to you", "presented by",
- "message from", "partnered with", "powered by", "supported by"]
- keywords = [keyword.lower() for keyword in keywords]
- relevant_lines = []
- for i, line in enumerate(srt_lines):
- if any(keyword in line.lower() for keyword in keywords):
- start = max(i - lines_before, 0)
- end = min(i + lines_after + 1, len(srt_lines))
- relevant_lines.extend(srt_lines[start:end])
- print("".join(relevant_lines))
- return "".join(relevant_lines)
- def get_sponsored_segments(relevant_srt_content):
- client = OpenAI(
- api_key = 'sk-proj-XYZ'
- )
- response = client.chat.completions.create(
- model="gpt-4o-mini",
- temperature=0.3,
- seed = 1000,
- messages=[
- {"role": "system", "content": "Identify the sponsored segments of the podcast transcription and only output the to-and-from time stamps of the segments. Provide the entirety of segments in one line, like '00:05:18,520 --> 00:07:24,760'. If there are less than 00:00:30 between the start and stop of two segments, combine them into one. One segment per line. ONLY output the timestamps, NOTHING else. If the transcription seems to end before the end of the segment, just provide the last timestamp, as the end of the segment."},
- {"role": "user", "content": relevant_srt_content}
- ]
- )
- # Filter to only include lines with timestamps
- return "\n".join(line for line in response.choices[0].message.content.split("\n") if " --> " in line)
- def convert_timestamp(timestamp):
- return timestamp.replace(',', '.')
- def validate_timestamp_format(timestamp):
- return bool(re.match(r'^\d{2}:\d{2}:\d{2},\d{3}$', timestamp))
- def validate_and_convert_timestamp(timestamp):
- if validate_timestamp_format(timestamp):
- return convert_timestamp(timestamp)
- else:
- raise ValueError(f"Invalid timestamp format: {timestamp}")
- def run_ffmpeg_command(command):
- result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8')
- if result.returncode != 0:
- print(f"Error running command: {' '.join(command)}")
- print(result.stderr)
- return result
- def validate_and_split_timestamps(line):
- try:
- start, end = line.split(" --> ")
- start = validate_and_convert_timestamp(start)
- end = validate_and_convert_timestamp(end)
- if start >= end:
- raise ValueError(f"Invalid timestamp logic: start {start} is not before end {end}")
- return start, end
- except ValueError as e:
- print(f"Skipping invalid timestamp line: {line}")
- return None, None
- def remove_segments(audio_file, segments):
- base_name = os.path.splitext(audio_file)[0]
- output_file = f"{base_name}_edited.mp3"
- times = segments.strip().split("\n")
- temp_files = []
- current_start = "00:00:00.000"
- total_saved_time = 0
- for i, time in enumerate(times):
- start, end = validate_and_split_timestamps(time)
- if start is None or end is None:
- continue
- part = f"{base_name}_part_{i}.mp3"
- # Extract the non-sponsored segment before the current sponsored segment
- if current_start < start:
- run_ffmpeg_command(["ffmpeg", "-i", audio_file, "-ss", current_start, "-to", start, "-c", "copy", part])
- temp_files.append(part)
- # Calculate the saved time
- start_sec = sum(float(x) * 60 ** i for i, x in enumerate(reversed(start.split(":"))))
- end_sec = sum(float(x) * 60 ** i for i, x in enumerate(reversed(end.split(":"))))
- total_saved_time += end_sec - start_sec
- # Update the start time for the next iteration to the end of the current sponsored segment
- current_start = end
- # Extract the last non-sponsored segment after the last sponsored segment
- part = f"{base_name}_part_end.mp3"
- run_ffmpeg_command(["ffmpeg", "-i", audio_file, "-ss", current_start, "-c", "copy", part])
- temp_files.append(part)
- # Concatenate all parts together
- with open("concat_list.txt", "w", encoding='utf-8') as f:
- for temp_file in temp_files:
- f.write(f"file '{temp_file}'\n")
- run_ffmpeg_command(["ffmpeg", "-f", "concat", "-safe", "0", "-i", "concat_list.txt", "-c", "copy", output_file])
- # Clean up temporary files
- # for temp_file in temp_files:
- # os.remove(temp_file)
- # os.remove("concat_list.txt")
- return output_file, total_saved_time
- def main(audio_file, srt_file=None):
- # Remove leading/trailing whitespaces and get absolute path
- audio_file = os.path.abspath(audio_file.strip())
- # Check if the first argument is an .mp3 file
- if not audio_file.lower().endswith('.mp3'):
- print("Error: The first argument must be an .mp3 file.")
- return
- # If the second argument is not provided, look for the corresponding .srt file
- if srt_file is None:
- base_name = os.path.splitext(audio_file)[0]
- srt_file = f"{base_name}.srt"
- srt_file = os.path.abspath(srt_file.strip())
- # Debugging: Check if the files exist
- if not os.path.exists(audio_file):
- print(f"File not found: {audio_file}")
- return
- if not os.path.exists(srt_file):
- print(f"File not found: {srt_file}")
- return
- srt_content = read_srt(srt_file)
- cleaned_srt_content = clean_srt_lines(srt_content)
- relevant_srt_content = extract_relevant_parts(cleaned_srt_content)
- sponsored_time_stamps = get_sponsored_segments(relevant_srt_content)
- # Print the segment timestamps
- print("Sponsored Segments Timestamps:")
- print(sponsored_time_stamps.strip())
- # Print the number of segments
- num_segments = sponsored_time_stamps.strip().count("\n") + 1 if sponsored_time_stamps.strip() else 0
- print(f"Number of Segments: {num_segments}")
- edited_file, total_saved_time = remove_segments(audio_file, sponsored_time_stamps)
- # Print the amount of time saved
- minutes, seconds = divmod(int(total_saved_time), 60)
- print(f"Total Time Saved: {minutes} minutes and {seconds} seconds")
- print(f"Edited file created: {edited_file}")
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(description="Remove sponsored segments from a podcast")
- parser.add_argument("audio_file", type=str, help="Path to the podcast audio file")
- parser.add_argument("srt_file", type=str, nargs='?', help="Path to the SRT transcription file")
- args = parser.parse_args()
- main(args.audio_file, args.srt_file)
Advertisement
Add Comment
Please, Sign In to add comment