Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import hashlib
- import requests
- import json
- from pathlib import Path
- import re
- import subprocess
- import urllib.parse
- def hash_video_id(video_id):
- """Create SHA256 hash of video ID and return first 4 characters"""
- hash_object = hashlib.sha256(video_id.encode('utf-8'))
- return hash_object.hexdigest()[:4]
- def get_sponsorblock_segments(video_id):
- print(f"Fetching SponsorBlock data for {video_id}...")
- # Get hash prefix
- hash_prefix = hash_video_id(video_id)
- # Categories to fetch
- categories = [
- "sponsor", "selfpromo", "exclusive_access", "interaction",
- "poi_highlight", "intro", "outro", "preview", "filler",
- "chapter", "music_offtopic"
- ]
- # Action types to fetch
- action_types = ["skip", "mute", "chapter", "full", "poi"]
- # Construct URL with proper encoding
- base_url = "https://sponsor.ajay.app/api/skipSegments"
- params = {
- 'categories': json.dumps(categories),
- 'actionTypes': json.dumps(action_types)
- }
- url = f"{base_url}/{hash_prefix}?{urllib.parse.urlencode(params)}"
- headers = {
- 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0',
- 'Accept': '*/*',
- 'Content-Type': 'application/json'
- }
- try:
- response = requests.get(url, headers=headers)
- response.raise_for_status()
- # Get all segments for videos with matching hash prefix
- all_videos = response.json()
- # Find our specific video
- for video in all_videos:
- if video.get('videoID') == video_id:
- segments = []
- for segment in video.get('segments', []):
- if segment.get('actionType') == 'skip': # Only process 'skip' segments
- segments.append({
- 'start_time': segment['segment'][0],
- 'end_time': segment['segment'][1],
- 'category': segment['category']
- })
- print(f"Found {len(segments)} segments to remove")
- return segments
- print("No segments found for this video")
- return []
- except requests.exceptions.RequestException as e:
- print(f"Error fetching SponsorBlock data: {e}")
- return []
- except Exception as e:
- print(f"Error processing SponsorBlock data: {e}")
- print(f"Response content: {response.text[:500]}...") # Print first 500 chars of response
- return []
- def get_video_id(filename):
- # Extract video ID from filename (assumes ID is in square brackets at the end)
- match = re.search(r'\[([a-zA-Z0-9_-]+)\]', filename)
- if match:
- return match.group(1)
- return None
- def cut_video_segments(input_file, output_file, segments):
- print(f"Running ffmpeg to remove {len(segments)} segments...")
- # Create output directory if it doesn't exist
- output_path = Path(output_file).absolute()
- os.makedirs(output_path.parent, exist_ok=True)
- # Get video duration
- duration = get_video_duration(input_file)
- if not duration:
- print(f"Couldn't get duration for {input_file}")
- return
- # Create inverse segments (the parts we want to keep)
- inverse_segments = []
- current_time = 0
- # Sort segments by start time
- sorted_segments = sorted(segments, key=lambda x: x['start_time'])
- for segment in sorted_segments:
- if segment['start_time'] > current_time:
- inverse_segments.append({
- 'start_time': current_time,
- 'end_time': segment['start_time']
- })
- current_time = segment['end_time']
- # Add final segment if needed
- if current_time < duration:
- inverse_segments.append({
- 'start_time': current_time,
- 'end_time': duration
- })
- print(f"Created {len(inverse_segments)} segments to keep")
- # Create temporary directory for segments
- temp_dir = Path("temp_segments").absolute()
- if temp_dir.exists():
- # Clean up any existing files
- for file in temp_dir.glob("*"):
- file.unlink()
- else:
- temp_dir.mkdir()
- # Split video into segments
- segment_files = []
- for i, segment in enumerate(inverse_segments):
- segment_file = temp_dir / f"segment_{i}.mkv"
- segment_files.append(segment_file)
- command = [
- 'ffmpeg',
- '-i', str(Path(input_file).absolute()),
- '-ss', str(segment['start_time']),
- '-to', str(segment['end_time']),
- '-c', 'copy',
- '-avoid_negative_ts', '1',
- str(segment_file)
- ]
- print(f"Extracting segment {i}: {segment['start_time']} to {segment['end_time']}")
- try:
- result = subprocess.run(command, capture_output=True, text=True)
- if result.returncode != 0:
- print(f"FFmpeg error output: {result.stderr}")
- return
- except subprocess.CalledProcessError as e:
- print(f"Error extracting segment {i}: {e}")
- print(f"FFmpeg error output: {e.stderr}")
- return
- # Create concat file
- concat_file = temp_dir / "concat.txt"
- with open(concat_file, 'w') as f:
- for segment_file in segment_files:
- # Properly escape the path
- escaped_path = str(segment_file).replace("'", "'\\''")
- f.write(f"file '{escaped_path}'\n")
- # Debug: Show concat file contents
- print("\nConcat file contents:")
- with open(concat_file, 'r') as f:
- print(f.read())
- # Modify the concat command to add progress handling
- concat_command = [
- 'ffmpeg',
- '-y', # Overwrite output file without asking
- '-f', 'concat',
- '-safe', '0',
- '-i', str(concat_file),
- '-c', 'copy',
- '-loglevel', 'error', # Reduce verbosity
- '-progress', 'pipe:1', # Force progress output
- str(output_path)
- ]
- print("\nRunning concat command...")
- try:
- # Use Popen instead of run to handle output streams
- process = subprocess.Popen(
- concat_command,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True
- )
- # Read output in real-time
- while True:
- output = process.stdout.readline()
- if output == '' and process.poll() is not None:
- break
- if output:
- print(output.strip())
- if process.returncode != 0:
- print(f"Concat failed with code {process.returncode}")
- return
- print(f"Successfully processed {input_file}")
- finally:
- # Clean up temporary files
- for file in segment_files:
- try:
- file.unlink()
- except Exception as e:
- print(f"Error removing {file}: {e}")
- try:
- concat_file.unlink()
- temp_dir.rmdir()
- except Exception as e:
- print(f"Error cleaning up temp dir: {e}")
- def get_video_duration(filename):
- cmd = [
- 'ffprobe',
- '-v', 'error',
- '-show_entries', 'format=duration',
- '-of', 'default=noprint_wrappers=1:nokey=1',
- filename
- ]
- try:
- result = subprocess.run(cmd, capture_output=True, text=True)
- return float(result.stdout.strip())
- except:
- return None
- def main():
- # Create output directory
- output_dir = Path('./sponsorblocked')
- output_dir.mkdir(exist_ok=True)
- # Process all video files in current directory
- for video_file in Path('.').glob('*.webm'):
- video_id = get_video_id(video_file.name)
- if not video_id:
- print(f"Couldn't extract video ID from {video_file.name}")
- continue
- segments = get_sponsorblock_segments(video_id)
- if not segments:
- print(f"No segments to remove for {video_file.name}")
- continue
- output_file = output_dir / video_file.name
- cut_video_segments(str(video_file), str(output_file), segments)
- if __name__ == '__main__':
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement