Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- Media Copy Script
- This script copies Immich files from a structured source directory (organized by year/month/FILE)
- to a flat destination folder. It guarantees that files are copied exactly once by tracking
- successful copies in a tracking file (already_copied.txt).
- The media files are renamed using a deterministic scheme to avoid conflicts in the flat destination:
- {year}_{month}_{original_stem}_{hash}{ext}
- - Filenames are truncated to 255 characters for filesystem compatibility.
- - An 8-character hash of the original filename is appended to ensure uniqueness.
- - Extensions and stems are normalized to lowercase.
- The script also cleans up the tracking file by removing entries for files that no longer exist.
- """
- import os
- import shutil
- import time
- import hashlib
- from pathlib import Path
- from typing import Set
- # -------------------------------------------------------------------
- # Configuration
- # -------------------------------------------------------------------
- SOURCE_DIR = Path("/mnt/Images/library/admin")
- DEST_DIR = Path("/mnt/Images/syncthing/gphoto")
- TRACKING_FILE = Path("/mnt/Images/syncthing/already_copied.txt")
- VALID_EXTENSIONS = {
- 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tif', 'tiff',
- 'heic', 'mov', 'mp4', 'avi', 'mkv', 'hevc'
- }
- # -------------------------------------------------------------------
- # Helper Function: get_mangled_name
- # -------------------------------------------------------------------
- def get_mangled_name(year: str, month: str, filename: str, max_length: int = 255) -> str:
- """
- Generate a unique mangled filename with a length cap.
- Appends an 8-character MD5 hash of the original filename to ensure uniqueness.
- Args:
- year (str): Year component derived from the file path.
- month (str): Month component derived from the file path.
- filename (str): The original filename.
- max_length (int, optional): Maximum allowed length for the filename. Defaults to 255.
- Returns:
- str: The mangled filename.
- """
- base = f"{year}_{month}_{Path(filename).stem.lower()}"
- ext = Path(filename).suffix.lower()
- # Create an 8-character MD5 hash from the original filename
- name_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
- # Calculate maximum base length accounting for hash, extension, and underscore
- max_base_length = max_length - len(ext) - len(name_hash) - 2
- truncated_base = base[:max_base_length]
- return f"{truncated_base}_{name_hash}{ext}"
- # -------------------------------------------------------------------
- # Function: scan_source_files
- # -------------------------------------------------------------------
- def scan_source_files() -> Set[str]:
- """
- Traverse the source directory structure (year/month/files) and return
- a set of absolute file paths (as strings) for valid media files.
- Returns:
- Set[str]: A set containing paths of valid media files.
- """
- current_files = set()
- for year_entry in os.scandir(SOURCE_DIR):
- if not year_entry.is_dir():
- continue
- for month_entry in os.scandir(year_entry.path):
- if not month_entry.is_dir():
- continue
- for file_entry in os.scandir(month_entry.path):
- file_path = Path(file_entry.path)
- if file_entry.is_file() and is_valid_file(file_path):
- current_files.add(str(file_path))
- return current_files
- # -------------------------------------------------------------------
- # Function: is_valid_file
- # -------------------------------------------------------------------
- def is_valid_file(file_path: Path) -> bool:
- """
- Check whether a file's extension is in the set of valid extensions (case-insensitive).
- Args:
- file_path (Path): The file path to check.
- Returns:
- bool: True if valid, False otherwise.
- """
- return file_path.suffix.lower().lstrip('.') in VALID_EXTENSIONS
- # -------------------------------------------------------------------
- # Function: read_tracking_file
- # -------------------------------------------------------------------
- def read_tracking_file() -> Set[str]:
- """
- Read and return the set of file paths (as strings) from the tracking file.
- Returns:
- Set[str]: A set of file paths that have been copied.
- """
- if not TRACKING_FILE.exists():
- return set()
- with TRACKING_FILE.open('r') as f:
- return {line.strip() for line in f if line.strip()}
- # -------------------------------------------------------------------
- # Function: write_tracking_file
- # -------------------------------------------------------------------
- def write_tracking_file(entries: Set[str]) -> None:
- """
- Atomically write the provided set of file paths to the tracking file.
- Only updates the file if there are changes.
- Args:
- entries (Set[str]): The set of file paths to write.
- """
- if entries == read_tracking_file():
- return
- temp_file = TRACKING_FILE.with_suffix('.tmp')
- with temp_file.open('w') as f:
- for entry in sorted(entries):
- f.write(f"{entry}\n")
- temp_file.replace(TRACKING_FILE)
- # -------------------------------------------------------------------
- # Function: copy_new_files
- # -------------------------------------------------------------------
- def copy_new_files(current_files: Set[str], tracked_files: Set[str]) -> Set[str]:
- """
- Copy new files from the source to the destination directory.
- Returns the set of file paths that were successfully copied.
- Args:
- current_files (Set[str]): All valid file paths currently in the source.
- tracked_files (Set[str]): File paths that have already been copied.
- Returns:
- Set[str]: File paths of successfully copied files.
- """
- new_files = current_files - tracked_files
- successfully_copied = set()
- for src_path_str in new_files:
- try:
- src = Path(src_path_str)
- # Extract year and month from the source file's parent directories
- year = src.parent.parent.name
- month = src.parent.name
- mangled_name = get_mangled_name(year, month, src.name)
- dest_path = DEST_DIR / mangled_name
- print(f"Copying {src} -> {dest_path}")
- shutil.copy2(src, dest_path)
- successfully_copied.add(src_path_str)
- except Exception as e:
- print(f"Error copying {src_path_str}: {e}")
- return successfully_copied
- # -------------------------------------------------------------------
- # Function: cleanup_tracking_file
- # -------------------------------------------------------------------
- def cleanup_tracking_file(current_files: Set[str]) -> None:
- """
- Remove tracking entries for files that no longer exist in the source.
- Args:
- current_files (Set[str]): The set of file paths currently present in the source.
- """
- tracked_files = read_tracking_file()
- updated_tracked = {file for file in tracked_files if Path(file).exists()}
- if updated_tracked != tracked_files:
- write_tracking_file(updated_tracked)
- # -------------------------------------------------------------------
- # Main Processing Loop
- # -------------------------------------------------------------------
- def main():
- """
- Main loop of the script:
- 1. Reads previously tracked (copied) file paths.
- 2. Scans the source directory for current valid files.
- 3. Copies new files and updates the tracking file accordingly.
- 4. Cleans up the tracking file by removing non-existent entries.
- """
- # Ensure the destination directory exists
- DEST_DIR.mkdir(parents=True, exist_ok=True)
- # Read the list of already copied files
- tracked_files = read_tracking_file()
- # Scan the source directory for valid media files
- current_files = scan_source_files()
- # Copy new files and capture which ones were successfully copied
- newly_copied = copy_new_files(current_files, tracked_files)
- # Update the tracking file with the new successful copies
- updated_tracked = tracked_files.union(newly_copied)
- write_tracking_file(updated_tracked)
- # Cleanup the tracking file by removing entries for files no longer in the source
- cleanup_tracking_file(current_files)
- # -------------------------------------------------------------------
- # Entry Point
- # -------------------------------------------------------------------
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment