Advertisement
Guest User

sponsorblock.py

a guest
Jan 21st, 2025
17
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.39 KB | None | 0 0
  1. import os
  2. import hashlib
  3. import requests
  4. import json
  5. from pathlib import Path
  6. import re
  7. import subprocess
  8. import urllib.parse
  9.  
  10. def hash_video_id(video_id):
  11.     """Create SHA256 hash of video ID and return first 4 characters"""
  12.     hash_object = hashlib.sha256(video_id.encode('utf-8'))
  13.     return hash_object.hexdigest()[:4]
  14.  
  15. def get_sponsorblock_segments(video_id):
  16.     print(f"Fetching SponsorBlock data for {video_id}...")
  17.    
  18.     # Get hash prefix
  19.     hash_prefix = hash_video_id(video_id)
  20.    
  21.     # Categories to fetch
  22.     categories = [
  23.         "sponsor", "selfpromo", "exclusive_access", "interaction",
  24.         "poi_highlight", "intro", "outro", "preview", "filler",
  25.         "chapter", "music_offtopic"
  26.     ]
  27.    
  28.     # Action types to fetch
  29.     action_types = ["skip", "mute", "chapter", "full", "poi"]
  30.    
  31.     # Construct URL with proper encoding
  32.     base_url = "https://sponsor.ajay.app/api/skipSegments"
  33.     params = {
  34.         'categories': json.dumps(categories),
  35.         'actionTypes': json.dumps(action_types)
  36.     }
  37.     url = f"{base_url}/{hash_prefix}?{urllib.parse.urlencode(params)}"
  38.    
  39.     headers = {
  40.         'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:128.0) Gecko/20100101 Firefox/128.0',
  41.         'Accept': '*/*',
  42.         'Content-Type': 'application/json'
  43.     }
  44.    
  45.     try:
  46.         response = requests.get(url, headers=headers)
  47.         response.raise_for_status()
  48.        
  49.         # Get all segments for videos with matching hash prefix
  50.         all_videos = response.json()
  51.        
  52.         # Find our specific video
  53.         for video in all_videos:
  54.             if video.get('videoID') == video_id:
  55.                 segments = []
  56.                 for segment in video.get('segments', []):
  57.                     if segment.get('actionType') == 'skip':  # Only process 'skip' segments
  58.                         segments.append({
  59.                             'start_time': segment['segment'][0],
  60.                             'end_time': segment['segment'][1],
  61.                             'category': segment['category']
  62.                         })
  63.                 print(f"Found {len(segments)} segments to remove")
  64.                 return segments
  65.        
  66.         print("No segments found for this video")
  67.         return []
  68.        
  69.     except requests.exceptions.RequestException as e:
  70.         print(f"Error fetching SponsorBlock data: {e}")
  71.         return []
  72.     except Exception as e:
  73.         print(f"Error processing SponsorBlock data: {e}")
  74.         print(f"Response content: {response.text[:500]}...")  # Print first 500 chars of response
  75.         return []
  76.  
  77. def get_video_id(filename):
  78.     # Extract video ID from filename (assumes ID is in square brackets at the end)
  79.     match = re.search(r'\[([a-zA-Z0-9_-]+)\]', filename)
  80.     if match:
  81.         return match.group(1)
  82.     return None
  83.  
  84. def cut_video_segments(input_file, output_file, segments):
  85.     print(f"Running ffmpeg to remove {len(segments)} segments...")
  86.    
  87.     # Create output directory if it doesn't exist
  88.     output_path = Path(output_file).absolute()
  89.     os.makedirs(output_path.parent, exist_ok=True)
  90.    
  91.     # Get video duration
  92.     duration = get_video_duration(input_file)
  93.     if not duration:
  94.         print(f"Couldn't get duration for {input_file}")
  95.         return
  96.    
  97.     # Create inverse segments (the parts we want to keep)
  98.     inverse_segments = []
  99.     current_time = 0
  100.    
  101.     # Sort segments by start time
  102.     sorted_segments = sorted(segments, key=lambda x: x['start_time'])
  103.    
  104.     for segment in sorted_segments:
  105.         if segment['start_time'] > current_time:
  106.             inverse_segments.append({
  107.                 'start_time': current_time,
  108.                 'end_time': segment['start_time']
  109.             })
  110.         current_time = segment['end_time']
  111.    
  112.     # Add final segment if needed
  113.     if current_time < duration:
  114.         inverse_segments.append({
  115.             'start_time': current_time,
  116.             'end_time': duration
  117.         })
  118.  
  119.     print(f"Created {len(inverse_segments)} segments to keep")
  120.    
  121.     # Create temporary directory for segments
  122.     temp_dir = Path("temp_segments").absolute()
  123.     if temp_dir.exists():
  124.         # Clean up any existing files
  125.         for file in temp_dir.glob("*"):
  126.             file.unlink()
  127.     else:
  128.         temp_dir.mkdir()
  129.    
  130.     # Split video into segments
  131.     segment_files = []
  132.     for i, segment in enumerate(inverse_segments):
  133.         segment_file = temp_dir / f"segment_{i}.mkv"
  134.         segment_files.append(segment_file)
  135.        
  136.         command = [
  137.             'ffmpeg',
  138.             '-i', str(Path(input_file).absolute()),
  139.             '-ss', str(segment['start_time']),
  140.             '-to', str(segment['end_time']),
  141.             '-c', 'copy',
  142.             '-avoid_negative_ts', '1',
  143.             str(segment_file)
  144.         ]
  145.        
  146.         print(f"Extracting segment {i}: {segment['start_time']} to {segment['end_time']}")
  147.         try:
  148.             result = subprocess.run(command, capture_output=True, text=True)
  149.             if result.returncode != 0:
  150.                 print(f"FFmpeg error output: {result.stderr}")
  151.                 return
  152.         except subprocess.CalledProcessError as e:
  153.             print(f"Error extracting segment {i}: {e}")
  154.             print(f"FFmpeg error output: {e.stderr}")
  155.             return
  156.  
  157.     # Create concat file
  158.     concat_file = temp_dir / "concat.txt"
  159.     with open(concat_file, 'w') as f:
  160.         for segment_file in segment_files:
  161.             # Properly escape the path
  162.             escaped_path = str(segment_file).replace("'", "'\\''")
  163.             f.write(f"file '{escaped_path}'\n")
  164.  
  165.     # Debug: Show concat file contents
  166.     print("\nConcat file contents:")
  167.     with open(concat_file, 'r') as f:
  168.         print(f.read())
  169.  
  170.     # Modify the concat command to add progress handling
  171.     concat_command = [
  172.         'ffmpeg',
  173.         '-y',  # Overwrite output file without asking
  174.         '-f', 'concat',
  175.         '-safe', '0',
  176.         '-i', str(concat_file),
  177.         '-c', 'copy',
  178.         '-loglevel', 'error',  # Reduce verbosity
  179.         '-progress', 'pipe:1',  # Force progress output
  180.         str(output_path)
  181.     ]
  182.    
  183.     print("\nRunning concat command...")
  184.     try:
  185.         # Use Popen instead of run to handle output streams
  186.         process = subprocess.Popen(
  187.             concat_command,
  188.             stdout=subprocess.PIPE,
  189.             stderr=subprocess.STDOUT,
  190.             universal_newlines=True
  191.         )
  192.        
  193.         # Read output in real-time
  194.         while True:
  195.             output = process.stdout.readline()
  196.             if output == '' and process.poll() is not None:
  197.                 break
  198.             if output:
  199.                 print(output.strip())
  200.        
  201.         if process.returncode != 0:
  202.             print(f"Concat failed with code {process.returncode}")
  203.             return
  204.         print(f"Successfully processed {input_file}")
  205.     finally:
  206.         # Clean up temporary files
  207.         for file in segment_files:
  208.             try:
  209.                 file.unlink()
  210.             except Exception as e:
  211.                 print(f"Error removing {file}: {e}")
  212.         try:
  213.             concat_file.unlink()
  214.             temp_dir.rmdir()
  215.         except Exception as e:
  216.             print(f"Error cleaning up temp dir: {e}")
  217.  
  218. def get_video_duration(filename):
  219.     cmd = [
  220.         'ffprobe',
  221.         '-v', 'error',
  222.         '-show_entries', 'format=duration',
  223.         '-of', 'default=noprint_wrappers=1:nokey=1',
  224.         filename
  225.     ]
  226.     try:
  227.         result = subprocess.run(cmd, capture_output=True, text=True)
  228.         return float(result.stdout.strip())
  229.     except:
  230.         return None
  231.  
  232. def main():
  233.     # Create output directory
  234.     output_dir = Path('./sponsorblocked')
  235.     output_dir.mkdir(exist_ok=True)
  236.    
  237.     # Process all video files in current directory
  238.     for video_file in Path('.').glob('*.webm'):
  239.         video_id = get_video_id(video_file.name)
  240.         if not video_id:
  241.             print(f"Couldn't extract video ID from {video_file.name}")
  242.             continue
  243.            
  244.         segments = get_sponsorblock_segments(video_id)
  245.         if not segments:
  246.             print(f"No segments to remove for {video_file.name}")
  247.             continue
  248.            
  249.         output_file = output_dir / video_file.name
  250.         cut_video_segments(str(video_file), str(output_file), segments)
  251.  
  252. if __name__ == '__main__':
  253.     main()
  254.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement