vasylmartyniv

Remove Russian Songs

Oct 24th, 2025
282
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.47 KB | Music | 0 0
  1. """
  2. Move 'Russian' songs from your Liked Songs into a separate playlist.
  3. Detection heuristics:
  4.  1) Cyrillic characters in track name or any artist name.
  5.  2) Artist genres contain keywords like 'russian', 'russia', 'cccp', 'cis', 'soviet', 'рус', 'ру', etc.
  6.  
  7. No likes are removed. If you want to also unlike after moving, set REMOVE_FROM_LIKED=True.
  8. """
  9.  
  10. import json
  11. import re
  12. import time
  13. import unicodedata
  14. from pathlib import Path
  15. from typing import List, Dict, Iterable
  16.  
  17. from dotenv import load_dotenv
  18. from spotipy import Spotify
  19. from spotipy.exceptions import SpotifyException
  20. from spotipy.oauth2 import SpotifyOAuth
  21.  
  22. # Load environment variables from .env file
  23. load_dotenv()
  24.  
  25. # -------------------- CONFIG --------------------
  26. PLAYLIST_NAME = "Russian Songs (from Liked)"
  27. PLAYLIST_PUBLIC = False
  28. REMOVE_FROM_LIKED = False  # set True if you want to unlike after moving
  29. BATCH_SIZE = 100  # Spotify allows 100 per add/remove call
  30. CACHE_FILE = Path(".spotify_cache.json")
  31.  
  32. # Keywords to flag in artist genres (lowercased)
  33. GENRE_KEYWORDS = {
  34.     "russian", "russia", "ru", "рус", "ру", "soviet", "русский", "cccp", "russkiy", "russky",
  35.     "russian hip hop", "russian rap", "russian pop", "russian rock"
  36. }
  37.  
  38. # Optional: add specific artist IDs you want to always treat as Russian (override)
  39. ALWAYS_RUSSIAN_ARTIST_IDS = set()  # e.g., {"0Xb4bTQ9C7k2rgnAru8v9A"}
  40.  
  41. # Optional: add specific artist IDs you want to exclude (never Russian)
  42. NEVER_RUSSIAN_ARTIST_IDS = set()
  43.  
  44. # ------------------------------------------------
  45.  
  46. def has_cyrillic(text: str) -> bool:
  47.     if not text:
  48.         return False
  49.     for ch in text:
  50.         if "CYRILLIC" in unicodedata.name(ch, ""):
  51.             return True
  52.     return False
  53.  
  54. def chunked(iterable: Iterable, n: int) -> Iterable[List]:
  55.     buf = []
  56.     for item in iterable:
  57.         buf.append(item)
  58.         if len(buf) == n:
  59.             yield buf
  60.             buf = []
  61.     if buf:
  62.         yield buf
  63.  
  64. def safe_call(fn, *args, **kwargs):
  65.     """Retry on rate limits (HTTP 429)."""
  66.     while True:
  67.         try:
  68.             return fn(*args, **kwargs)
  69.         except SpotifyException as e:
  70.             if e.http_status == 429:
  71.                 retry_after = int(e.headers.get("Retry-After", "2"))
  72.                 time.sleep(retry_after + 1)
  73.                 continue
  74.             raise
  75.  
  76. def get_or_create_playlist(sp: Spotify, user_id: str, name: str, public: bool) -> str:
  77.     # Try to find existing playlist by exact name (first page or two is enough for most users)
  78.     results = safe_call(sp.current_user_playlists, limit=50)
  79.     while results:
  80.         for pl in results["items"]:
  81.             if pl["name"] == name and pl["owner"]["id"] == user_id:
  82.                 return pl["id"]
  83.         if results.get("next"):
  84.             results = safe_call(sp.next, results)
  85.         else:
  86.             break
  87.     # Create if not found
  88.     created = safe_call(sp.user_playlist_create, user=user_id, name=name, public=public, description="Auto-collected Russian songs from Liked")
  89.     return created["id"]
  90.  
  91. def classify_track(sp: Spotify, track_obj: Dict, artist_cache: Dict[str, Dict]) -> tuple[bool, str]:
  92.     """
  93.    Returns (True, reason) if the track is considered 'Russian' by heuristics.
  94.    Returns (False, "") otherwise.
  95.    """
  96.     track_name = (track_obj.get("name") or "").strip()
  97.  
  98.     # Heuristic 1: Cyrillic in track title
  99.     if has_cyrillic(track_name):
  100.         return True, f"Cyrillic in track title: '{track_name}'"
  101.  
  102.     artists = track_obj.get("artists") or []
  103.     artist_ids = [a["id"] for a in artists if a.get("id")]
  104.     artist_names = [a.get("name") or "" for a in artists]
  105.  
  106.     # Heuristic 2: Cyrillic in any artist name
  107.     for artist_name in artist_names:
  108.         if has_cyrillic(artist_name):
  109.             return True, f"Cyrillic in artist name: '{artist_name}'"
  110.  
  111.     # Respect explicit overrides
  112.     if any(aid in ALWAYS_RUSSIAN_ARTIST_IDS for aid in artist_ids):
  113.         return True, "Artist in ALWAYS_RUSSIAN_ARTIST_IDS"
  114.     if any(aid in NEVER_RUSSIAN_ARTIST_IDS for aid in artist_ids):
  115.         return False, ""
  116.  
  117.     # Heuristic 3: genre-based
  118.     # Fetch artist details (cached)
  119.     to_fetch = [aid for aid in artist_ids if aid and aid not in artist_cache]
  120.     for chunk in chunked(to_fetch, 50):
  121.         if chunk:
  122.             arts = safe_call(sp.artists, chunk)
  123.             for a in arts["artists"]:
  124.                 artist_cache[a["id"]] = a
  125.  
  126.     for aid in artist_ids:
  127.         a = artist_cache.get(aid)
  128.         if not a:
  129.             continue
  130.         artist_name = a.get("name", "Unknown")
  131.         genres = [g.lower() for g in (a.get("genres") or [])]
  132.         for genre in genres:
  133.             for keyword in GENRE_KEYWORDS:
  134.                 # Use word boundary matching to avoid false positives
  135.                 # e.g., "ru" shouldn't match "ruby" or "grunge"
  136.                 if re.search(r'\b' + re.escape(keyword) + r'\b', genre):
  137.                     return True, f"Genre match: '{genre}' (artist: {artist_name}, keyword: '{keyword}')"
  138.  
  139.     return False, ""
  140.  
  141. def load_cache() -> Dict:
  142.     if not CACHE_FILE.is_file():
  143.         return {}
  144.     with CACHE_FILE.open("r", encoding="utf-8") as f:
  145.         return json.load(f)
  146.  
  147. def save_cache(data: Dict):
  148.     with CACHE_FILE.open("w", encoding="utf-8") as f:
  149.         json.dump(data, f, ensure_ascii=False, indent=2)
  150.  
  151. def get_playlist_tracks(sp: Spotify, playlist_id: str) -> set:
  152.     """Get all track URIs currently in the playlist to avoid duplicates."""
  153.     track_uris = set()
  154.     results = safe_call(sp.playlist_items, playlist_id, limit=100)
  155.     while results:
  156.         for item in results.get("items", []):
  157.             track = item.get("track")
  158.             if track and track.get("uri"):
  159.                 track_uris.add(track["uri"])
  160.         if results.get("next"):
  161.             results = safe_call(sp.next, results)
  162.         else:
  163.             break
  164.     return track_uris
  165.  
  166. def main():
  167.     # Auth
  168.     scope = "user-library-read playlist-modify-private playlist-modify-public user-library-modify"
  169.     sp = Spotify(auth_manager=SpotifyOAuth(scope=scope))
  170.     me = safe_call(sp.current_user)
  171.     user_id = me["id"]
  172.  
  173.     # Load cached artist data
  174.     cache_data = load_cache()
  175.     artist_cache = cache_data.get("artists", {})
  176.  
  177.     playlist_id = get_or_create_playlist(sp, user_id, PLAYLIST_NAME, PLAYLIST_PUBLIC)
  178.  
  179.     print(f"Collecting Russian tracks from Liked Songs into playlist: {PLAYLIST_NAME}")
  180.     print(f"Playlist ID: {playlist_id}")
  181.  
  182.     # Get existing playlist tracks to avoid duplicates
  183.     print("Checking existing playlist contents...")
  184.     existing_tracks = get_playlist_tracks(sp, playlist_id)
  185.     print(f"Found {len(existing_tracks)} tracks already in playlist")
  186.  
  187.     # Iterate through liked tracks
  188.     russian_track_uris = []
  189.     results = safe_call(sp.current_user_saved_tracks, limit=50)
  190.     total = results.get("total", 0)
  191.     processed = 0
  192.  
  193.     while True:
  194.         items = results.get("items", [])
  195.         for item in items:
  196.             track = item.get("track")
  197.             if not track:
  198.                 continue
  199.             processed += 1
  200.  
  201.             uri = track.get("uri")
  202.  
  203.             # Skip if already in playlist
  204.             if uri in existing_tracks:
  205.                 continue
  206.  
  207.             try:
  208.                 is_russian, reason = classify_track(sp, track, artist_cache)
  209.                 if is_russian:
  210.                     track_name = track.get("name", "Unknown")
  211.                     artists_str = ", ".join([a.get("name", "") for a in track.get("artists", [])])
  212.                     print(f"✓ [{track_name}] by [{artists_str}] - {reason}")
  213.                     if uri:
  214.                         russian_track_uris.append(uri)
  215.             except Exception as e:
  216.                 # Log and keep going
  217.                 tname = (track.get("name") or "Unknown").encode("utf-8", "ignore")
  218.                 print(f"[WARN] Failed to classify track: {tname} | {e}")
  219.  
  220.         if processed % 100 == 0:
  221.             print(f"Processed {processed}/{total} liked tracks...")
  222.             time.sleep(0.5)  # Small delay every 100 tracks to avoid rate limits
  223.  
  224.         if results.get("next"):
  225.             results = safe_call(sp.next, results)
  226.         else:
  227.             break
  228.  
  229.     print(f"Processed {processed}/{total} liked tracks total")
  230.  
  231.     # Add to playlist in batches
  232.     if russian_track_uris:
  233.         print(f"Found {len(russian_track_uris)} new Russian tracks to add")
  234.         added = 0
  235.         for chunk in chunked(russian_track_uris, BATCH_SIZE):
  236.             safe_call(sp.playlist_add_items, playlist_id, chunk)
  237.             added += len(chunk)
  238.             print(f"Added {added}/{len(russian_track_uris)} to playlist")
  239.             time.sleep(0.3)  # Small delay between batches
  240.  
  241.         # Optional: unlike after moving
  242.         if REMOVE_FROM_LIKED:
  243.             removed = 0
  244.             # Convert URIs to IDs
  245.             ids = [uri.split(":")[-1] for uri in russian_track_uris]
  246.             for chunk in chunked(ids, BATCH_SIZE):
  247.                 safe_call(sp.current_user_saved_tracks_delete, chunk)
  248.                 removed += len(chunk)
  249.                 print(f"Removed {removed}/{len(ids)} from Liked Songs")
  250.                 time.sleep(0.3)  # Small delay between batches
  251.  
  252.         print(f"Done. Moved {len(russian_track_uris)} new tracks to '{PLAYLIST_NAME}'.")
  253.     else:
  254.         print("No new Russian tracks to add.")
  255.  
  256.     # Save updated cache
  257.     save_cache({"artists": artist_cache})
  258.  
  259. if __name__ == "__main__":
  260.     main()
  261.  
Advertisement
Add Comment
Please, Sign In to add comment