Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Move 'Russian' songs from your Liked Songs into a separate playlist.
- Detection heuristics:
- 1) Cyrillic characters in track name or any artist name.
- 2) Artist genres contain keywords like 'russian', 'russia', 'cccp', 'cis', 'soviet', 'рус', 'ру', etc.
- No likes are removed. If you want to also unlike after moving, set REMOVE_FROM_LIKED=True.
- """
- import json
- import re
- import time
- import unicodedata
- from pathlib import Path
- from typing import List, Dict, Iterable
- from dotenv import load_dotenv
- from spotipy import Spotify
- from spotipy.exceptions import SpotifyException
- from spotipy.oauth2 import SpotifyOAuth
- # Load environment variables from .env file
- load_dotenv()
- # -------------------- CONFIG --------------------
- PLAYLIST_NAME = "Russian Songs (from Liked)"
- PLAYLIST_PUBLIC = False
- REMOVE_FROM_LIKED = False # set True if you want to unlike after moving
- BATCH_SIZE = 100 # Spotify allows 100 per add/remove call
- CACHE_FILE = Path(".spotify_cache.json")
- # Keywords to flag in artist genres (lowercased)
- GENRE_KEYWORDS = {
- "russian", "russia", "ru", "рус", "ру", "soviet", "русский", "cccp", "russkiy", "russky",
- "russian hip hop", "russian rap", "russian pop", "russian rock"
- }
- # Optional: add specific artist IDs you want to always treat as Russian (override)
- ALWAYS_RUSSIAN_ARTIST_IDS = set() # e.g., {"0Xb4bTQ9C7k2rgnAru8v9A"}
- # Optional: add specific artist IDs you want to exclude (never Russian)
- NEVER_RUSSIAN_ARTIST_IDS = set()
- # ------------------------------------------------
- def has_cyrillic(text: str) -> bool:
- if not text:
- return False
- for ch in text:
- if "CYRILLIC" in unicodedata.name(ch, ""):
- return True
- return False
- def chunked(iterable: Iterable, n: int) -> Iterable[List]:
- buf = []
- for item in iterable:
- buf.append(item)
- if len(buf) == n:
- yield buf
- buf = []
- if buf:
- yield buf
- def safe_call(fn, *args, **kwargs):
- """Retry on rate limits (HTTP 429)."""
- while True:
- try:
- return fn(*args, **kwargs)
- except SpotifyException as e:
- if e.http_status == 429:
- retry_after = int(e.headers.get("Retry-After", "2"))
- time.sleep(retry_after + 1)
- continue
- raise
- def get_or_create_playlist(sp: Spotify, user_id: str, name: str, public: bool) -> str:
- # Try to find existing playlist by exact name (first page or two is enough for most users)
- results = safe_call(sp.current_user_playlists, limit=50)
- while results:
- for pl in results["items"]:
- if pl["name"] == name and pl["owner"]["id"] == user_id:
- return pl["id"]
- if results.get("next"):
- results = safe_call(sp.next, results)
- else:
- break
- # Create if not found
- created = safe_call(sp.user_playlist_create, user=user_id, name=name, public=public, description="Auto-collected Russian songs from Liked")
- return created["id"]
- def classify_track(sp: Spotify, track_obj: Dict, artist_cache: Dict[str, Dict]) -> tuple[bool, str]:
- """
- Returns (True, reason) if the track is considered 'Russian' by heuristics.
- Returns (False, "") otherwise.
- """
- track_name = (track_obj.get("name") or "").strip()
- # Heuristic 1: Cyrillic in track title
- if has_cyrillic(track_name):
- return True, f"Cyrillic in track title: '{track_name}'"
- artists = track_obj.get("artists") or []
- artist_ids = [a["id"] for a in artists if a.get("id")]
- artist_names = [a.get("name") or "" for a in artists]
- # Heuristic 2: Cyrillic in any artist name
- for artist_name in artist_names:
- if has_cyrillic(artist_name):
- return True, f"Cyrillic in artist name: '{artist_name}'"
- # Respect explicit overrides
- if any(aid in ALWAYS_RUSSIAN_ARTIST_IDS for aid in artist_ids):
- return True, "Artist in ALWAYS_RUSSIAN_ARTIST_IDS"
- if any(aid in NEVER_RUSSIAN_ARTIST_IDS for aid in artist_ids):
- return False, ""
- # Heuristic 3: genre-based
- # Fetch artist details (cached)
- to_fetch = [aid for aid in artist_ids if aid and aid not in artist_cache]
- for chunk in chunked(to_fetch, 50):
- if chunk:
- arts = safe_call(sp.artists, chunk)
- for a in arts["artists"]:
- artist_cache[a["id"]] = a
- for aid in artist_ids:
- a = artist_cache.get(aid)
- if not a:
- continue
- artist_name = a.get("name", "Unknown")
- genres = [g.lower() for g in (a.get("genres") or [])]
- for genre in genres:
- for keyword in GENRE_KEYWORDS:
- # Use word boundary matching to avoid false positives
- # e.g., "ru" shouldn't match "ruby" or "grunge"
- if re.search(r'\b' + re.escape(keyword) + r'\b', genre):
- return True, f"Genre match: '{genre}' (artist: {artist_name}, keyword: '{keyword}')"
- return False, ""
- def load_cache() -> Dict:
- if not CACHE_FILE.is_file():
- return {}
- with CACHE_FILE.open("r", encoding="utf-8") as f:
- return json.load(f)
- def save_cache(data: Dict):
- with CACHE_FILE.open("w", encoding="utf-8") as f:
- json.dump(data, f, ensure_ascii=False, indent=2)
- def get_playlist_tracks(sp: Spotify, playlist_id: str) -> set:
- """Get all track URIs currently in the playlist to avoid duplicates."""
- track_uris = set()
- results = safe_call(sp.playlist_items, playlist_id, limit=100)
- while results:
- for item in results.get("items", []):
- track = item.get("track")
- if track and track.get("uri"):
- track_uris.add(track["uri"])
- if results.get("next"):
- results = safe_call(sp.next, results)
- else:
- break
- return track_uris
- def main():
- # Auth
- scope = "user-library-read playlist-modify-private playlist-modify-public user-library-modify"
- sp = Spotify(auth_manager=SpotifyOAuth(scope=scope))
- me = safe_call(sp.current_user)
- user_id = me["id"]
- # Load cached artist data
- cache_data = load_cache()
- artist_cache = cache_data.get("artists", {})
- playlist_id = get_or_create_playlist(sp, user_id, PLAYLIST_NAME, PLAYLIST_PUBLIC)
- print(f"Collecting Russian tracks from Liked Songs into playlist: {PLAYLIST_NAME}")
- print(f"Playlist ID: {playlist_id}")
- # Get existing playlist tracks to avoid duplicates
- print("Checking existing playlist contents...")
- existing_tracks = get_playlist_tracks(sp, playlist_id)
- print(f"Found {len(existing_tracks)} tracks already in playlist")
- # Iterate through liked tracks
- russian_track_uris = []
- results = safe_call(sp.current_user_saved_tracks, limit=50)
- total = results.get("total", 0)
- processed = 0
- while True:
- items = results.get("items", [])
- for item in items:
- track = item.get("track")
- if not track:
- continue
- processed += 1
- uri = track.get("uri")
- # Skip if already in playlist
- if uri in existing_tracks:
- continue
- try:
- is_russian, reason = classify_track(sp, track, artist_cache)
- if is_russian:
- track_name = track.get("name", "Unknown")
- artists_str = ", ".join([a.get("name", "") for a in track.get("artists", [])])
- print(f"✓ [{track_name}] by [{artists_str}] - {reason}")
- if uri:
- russian_track_uris.append(uri)
- except Exception as e:
- # Log and keep going
- tname = (track.get("name") or "Unknown").encode("utf-8", "ignore")
- print(f"[WARN] Failed to classify track: {tname} | {e}")
- if processed % 100 == 0:
- print(f"Processed {processed}/{total} liked tracks...")
- time.sleep(0.5) # Small delay every 100 tracks to avoid rate limits
- if results.get("next"):
- results = safe_call(sp.next, results)
- else:
- break
- print(f"Processed {processed}/{total} liked tracks total")
- # Add to playlist in batches
- if russian_track_uris:
- print(f"Found {len(russian_track_uris)} new Russian tracks to add")
- added = 0
- for chunk in chunked(russian_track_uris, BATCH_SIZE):
- safe_call(sp.playlist_add_items, playlist_id, chunk)
- added += len(chunk)
- print(f"Added {added}/{len(russian_track_uris)} to playlist")
- time.sleep(0.3) # Small delay between batches
- # Optional: unlike after moving
- if REMOVE_FROM_LIKED:
- removed = 0
- # Convert URIs to IDs
- ids = [uri.split(":")[-1] for uri in russian_track_uris]
- for chunk in chunked(ids, BATCH_SIZE):
- safe_call(sp.current_user_saved_tracks_delete, chunk)
- removed += len(chunk)
- print(f"Removed {removed}/{len(ids)} from Liked Songs")
- time.sleep(0.3) # Small delay between batches
- print(f"Done. Moved {len(russian_track_uris)} new tracks to '{PLAYLIST_NAME}'.")
- else:
- print("No new Russian tracks to add.")
- # Save updated cache
- save_cache({"artists": artist_cache})
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment