Guest User

Immich Media Copy Script

a guest
Feb 5th, 2025
407
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 8.56 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. """
  3. Media Copy Script
  4.  
  5. This script copies Immich files from a structured source directory (organized by year/month/FILE)
  6. to a flat destination folder. It guarantees that files are copied exactly once by tracking
  7. successful copies in a tracking file (already_copied.txt).
  8.  
  9. The media files are renamed using a deterministic scheme to avoid conflicts in the flat destination:
  10.    {year}_{month}_{original_stem}_{hash}{ext}
  11.    
  12. - Filenames are truncated to 255 characters for filesystem compatibility.
  13. - An 8-character hash of the original filename is appended to ensure uniqueness.
  14. - Extensions and stems are normalized to lowercase.
  15.    
  16. The script also cleans up the tracking file by removing entries for files that no longer exist.
  17. """
  18.  
  19. import os
  20. import shutil
  21. import time
  22. import hashlib
  23. from pathlib import Path
  24. from typing import Set
  25.  
  26. # -------------------------------------------------------------------
  27. # Configuration
  28. # -------------------------------------------------------------------
  29. SOURCE_DIR = Path("/mnt/Images/library/admin")
  30. DEST_DIR = Path("/mnt/Images/syncthing/gphoto")
  31. TRACKING_FILE = Path("/mnt/Images/syncthing/already_copied.txt")
  32. VALID_EXTENSIONS = {
  33.     'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tif', 'tiff',
  34.     'heic', 'mov', 'mp4', 'avi', 'mkv', 'hevc'
  35. }
  36.  
  37. # -------------------------------------------------------------------
  38. # Helper Function: get_mangled_name
  39. # -------------------------------------------------------------------
  40. def get_mangled_name(year: str, month: str, filename: str, max_length: int = 255) -> str:
  41.     """
  42.    Generate a unique mangled filename with a length cap.
  43.    Appends an 8-character MD5 hash of the original filename to ensure uniqueness.
  44.  
  45.    Args:
  46.        year (str): Year component derived from the file path.
  47.        month (str): Month component derived from the file path.
  48.        filename (str): The original filename.
  49.        max_length (int, optional): Maximum allowed length for the filename. Defaults to 255.
  50.  
  51.    Returns:
  52.        str: The mangled filename.
  53.    """
  54.     base = f"{year}_{month}_{Path(filename).stem.lower()}"
  55.     ext = Path(filename).suffix.lower()
  56.     # Create an 8-character MD5 hash from the original filename
  57.     name_hash = hashlib.md5(filename.encode()).hexdigest()[:8]
  58.     # Calculate maximum base length accounting for hash, extension, and underscore
  59.     max_base_length = max_length - len(ext) - len(name_hash) - 2
  60.     truncated_base = base[:max_base_length]
  61.     return f"{truncated_base}_{name_hash}{ext}"
  62.  
  63. # -------------------------------------------------------------------
  64. # Function: scan_source_files
  65. # -------------------------------------------------------------------
  66. def scan_source_files() -> Set[str]:
  67.     """
  68.    Traverse the source directory structure (year/month/files) and return
  69.    a set of absolute file paths (as strings) for valid media files.
  70.  
  71.    Returns:
  72.        Set[str]: A set containing paths of valid media files.
  73.    """
  74.     current_files = set()
  75.     for year_entry in os.scandir(SOURCE_DIR):
  76.         if not year_entry.is_dir():
  77.             continue
  78.         for month_entry in os.scandir(year_entry.path):
  79.             if not month_entry.is_dir():
  80.                 continue
  81.             for file_entry in os.scandir(month_entry.path):
  82.                 file_path = Path(file_entry.path)
  83.                 if file_entry.is_file() and is_valid_file(file_path):
  84.                     current_files.add(str(file_path))
  85.     return current_files
  86.  
  87. # -------------------------------------------------------------------
  88. # Function: is_valid_file
  89. # -------------------------------------------------------------------
  90. def is_valid_file(file_path: Path) -> bool:
  91.     """
  92.    Check whether a file's extension is in the set of valid extensions (case-insensitive).
  93.  
  94.    Args:
  95.        file_path (Path): The file path to check.
  96.  
  97.    Returns:
  98.        bool: True if valid, False otherwise.
  99.    """
  100.     return file_path.suffix.lower().lstrip('.') in VALID_EXTENSIONS
  101.  
  102. # -------------------------------------------------------------------
  103. # Function: read_tracking_file
  104. # -------------------------------------------------------------------
  105. def read_tracking_file() -> Set[str]:
  106.     """
  107.    Read and return the set of file paths (as strings) from the tracking file.
  108.  
  109.    Returns:
  110.        Set[str]: A set of file paths that have been copied.
  111.    """
  112.     if not TRACKING_FILE.exists():
  113.         return set()
  114.     with TRACKING_FILE.open('r') as f:
  115.         return {line.strip() for line in f if line.strip()}
  116.  
  117. # -------------------------------------------------------------------
  118. # Function: write_tracking_file
  119. # -------------------------------------------------------------------
  120. def write_tracking_file(entries: Set[str]) -> None:
  121.     """
  122.    Atomically write the provided set of file paths to the tracking file.
  123.    Only updates the file if there are changes.
  124.  
  125.    Args:
  126.        entries (Set[str]): The set of file paths to write.
  127.    """
  128.     if entries == read_tracking_file():
  129.         return
  130.     temp_file = TRACKING_FILE.with_suffix('.tmp')
  131.     with temp_file.open('w') as f:
  132.         for entry in sorted(entries):
  133.             f.write(f"{entry}\n")
  134.     temp_file.replace(TRACKING_FILE)
  135.  
  136. # -------------------------------------------------------------------
  137. # Function: copy_new_files
  138. # -------------------------------------------------------------------
  139. def copy_new_files(current_files: Set[str], tracked_files: Set[str]) -> Set[str]:
  140.     """
  141.    Copy new files from the source to the destination directory.
  142.    Returns the set of file paths that were successfully copied.
  143.  
  144.    Args:
  145.        current_files (Set[str]): All valid file paths currently in the source.
  146.        tracked_files (Set[str]): File paths that have already been copied.
  147.  
  148.    Returns:
  149.        Set[str]: File paths of successfully copied files.
  150.    """
  151.     new_files = current_files - tracked_files
  152.     successfully_copied = set()
  153.     for src_path_str in new_files:
  154.         try:
  155.             src = Path(src_path_str)
  156.             # Extract year and month from the source file's parent directories
  157.             year = src.parent.parent.name
  158.             month = src.parent.name
  159.             mangled_name = get_mangled_name(year, month, src.name)
  160.             dest_path = DEST_DIR / mangled_name
  161.             print(f"Copying {src} -> {dest_path}")
  162.             shutil.copy2(src, dest_path)
  163.             successfully_copied.add(src_path_str)
  164.         except Exception as e:
  165.             print(f"Error copying {src_path_str}: {e}")
  166.     return successfully_copied
  167.  
  168. # -------------------------------------------------------------------
  169. # Function: cleanup_tracking_file
  170. # -------------------------------------------------------------------
  171. def cleanup_tracking_file(current_files: Set[str]) -> None:
  172.     """
  173.    Remove tracking entries for files that no longer exist in the source.
  174.    
  175.    Args:
  176.        current_files (Set[str]): The set of file paths currently present in the source.
  177.    """
  178.     tracked_files = read_tracking_file()
  179.     updated_tracked = {file for file in tracked_files if Path(file).exists()}
  180.     if updated_tracked != tracked_files:
  181.         write_tracking_file(updated_tracked)
  182.  
  183. # -------------------------------------------------------------------
  184. # Main Processing Loop
  185. # -------------------------------------------------------------------
  186. def main():
  187.     """
  188.    Main loop of the script:
  189.      1. Reads previously tracked (copied) file paths.
  190.      2. Scans the source directory for current valid files.
  191.      3. Copies new files and updates the tracking file accordingly.
  192.      4. Cleans up the tracking file by removing non-existent entries.
  193.    """
  194.     # Ensure the destination directory exists
  195.     DEST_DIR.mkdir(parents=True, exist_ok=True)
  196.    
  197.     # Read the list of already copied files
  198.     tracked_files = read_tracking_file()
  199.    
  200.     # Scan the source directory for valid media files
  201.     current_files = scan_source_files()
  202.    
  203.     # Copy new files and capture which ones were successfully copied
  204.     newly_copied = copy_new_files(current_files, tracked_files)
  205.    
  206.     # Update the tracking file with the new successful copies
  207.     updated_tracked = tracked_files.union(newly_copied)
  208.     write_tracking_file(updated_tracked)
  209.    
  210.     # Cleanup the tracking file by removing entries for files no longer in the source
  211.     cleanup_tracking_file(current_files)
  212.  
  213. # -------------------------------------------------------------------
  214. # Entry Point
  215. # -------------------------------------------------------------------
  216. if __name__ == "__main__":
  217.     main()
  218.  
Advertisement
Add Comment
Please, Sign In to add comment