Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Please implement high quality python logging with optional debug mode to the following script, called textual_app.py: """Textual UI for reclaimed with interactive file/folder management.""" import asyncio import os import shutil import time from pathlib import Path from typing import Callable, List, Optional from rich.text import Text from textual import on from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Container, Horizontal from textual.screen import ModalScreen from textual.widgets import ( Button, DataTable, Footer, Header, LoadingIndicator, RadioButton, RadioSet, Static, ) from textual.worker import Worker, WorkerState from ..core import DiskScanner, FileInfo, ScanOptions from ..utils.formatters import format_size from .styles import TEXTUAL_CSS class ProgressManager: """Manages progress bar lifecycle to prevent duplicate IDs and provide smoother updates.""" def __init__(self, app: App, container_id: str): """Initialize the progress manager. Args: app: The parent Textual app container_id: ID of the container to mount progress bars in """ self.app = app self.container_id = container_id self.last_update_time = 0 self.update_interval = 0.1 # Update at most 10 times per second self.last_progress_value = 0 self.min_progress_increment = 0.005 # Minimum 0.5% change to update class ConfirmationDialog(ModalScreen): """A modal dialog for confirming file/folder deletion.""" def __init__(self, item_path: Path, is_dir: bool = False): super().__init__() self.item_path = item_path self.is_dir = is_dir self.item_type = "directory" if is_dir else "file" def compose(self) -> ComposeResult: """Compose the confirmation dialog.""" with Container(id="dialog-container"): yield Static( f"Are you sure you want to delete this {self.item_type}?", id="dialog-title" ) yield Static(f"[bold red]{self.item_path}[/]", id="dialog-path") if self.is_dir: yield Static("[yellow]Warning: This will delete all contents recursively![/]") with Horizontal(id="dialog-buttons"): yield Button("Cancel", variant="primary", id="cancel-button") yield Button("Delete", variant="error", id="confirm-button") @on(Button.Pressed, "#cancel-button") def cancel_deletion(self) -> None: """Cancel the deletion operation.""" self.dismiss(False) @on(Button.Pressed, "#confirm-button") def confirm_deletion(self) -> None: """Confirm the deletion operation.""" self.dismiss(True) class SortOptions(ModalScreen): """A modal dialog for selecting sort options.""" def compose(self) -> ComposeResult: """Compose the sort options dialog.""" with Container(id="sort-container"): yield Static("Sort by:", id="sort-title") with RadioSet(id="sort-options"): yield RadioButton("Size (largest first)", id="sort-size", value=True) yield RadioButton("Name (A-Z)", id="sort-name") yield RadioButton("Path (A-Z)", id="sort-path") with Horizontal(id="sort-buttons"): yield Button("Cancel", variant="primary", id="sort-cancel") yield Button("Apply", variant="success", id="sort-apply") @on(Button.Pressed, "#sort-cancel") def cancel_sort(self) -> None: """Cancel the sort operation.""" self.dismiss(None) @on(Button.Pressed, "#sort-apply") def apply_sort(self) -> None: """Apply the selected sort option.""" sort_option = self.query_one("#sort-options").pressed_button.id self.dismiss(sort_option) class ReclaimedApp(App): """Textual app for reclaimed with interactive file management.""" CSS = TEXTUAL_CSS BINDINGS = [ Binding("q", "quit", "Quit"), Binding("f", "focus_files", "Focus Files"), Binding("d", "focus_dirs", "Focus Directories"), Binding("tab", "toggle_focus", "Toggle Focus"), Binding("s", "sort", "Sort"), Binding("r", "refresh", "Refresh"), Binding("delete", "delete_selected", "Delete"), Binding("?", "help", "Help"), ] def __init__( self, path: Path, options: ScanOptions, on_exit_callback: Optional[Callable] = None ): """Initialize the app with the path to scan. Args: path: Directory to scan options: Scan configuration options on_exit_callback: Optional callback to run on exit """ super().__init__() self.path = path.resolve() self.options = options self.on_exit_callback = on_exit_callback self.scanner = DiskScanner(options) self.largest_files: List[FileInfo] = [] self.largest_dirs: List[FileInfo] = [] self.current_focus = "files" # Tracks which table has focus self.sort_method = "sort-size" # Default sort method self.progress_manager = None # Will be initialized after mount def compose(self) -> ComposeResult: """Compose the app layout.""" yield Header(show_clock=True) yield Static("[bold]Reclaimed[/bold]", id="title") with Container(id="main-container"): # Status bar with scan info with Horizontal(id="status-bar"): yield Static("Path:", id="status-label") yield Static(f"{self.path}", id="path-display") yield Static("", id="scan-timer") yield Static("", id="scan-count") # Directories section yield Static("[bold]Largest Directories[/bold]", id="dirs-section-header") dirs_table = DataTable(id="dirs-table") dirs_table.add_columns("Size", "Storage", "Path") yield dirs_table # Files section yield Static("[bold]Largest Files[/bold]", id="files-section-header") files_table = DataTable(id="files-table") files_table.add_columns("Size", "Storage", "Path") yield files_table with Horizontal(id="footer-container"): yield Footer() yield LoadingIndicator(id="scan-progress") def on_mount(self) -> None: """Event handler called when the app is mounted.""" # Initialize progress manager self.progress_manager = ProgressManager(self, "main-container") # Start the initial scan self.scan_directory() # Set initial focus to the files table after scan completes self.set_timer(0.1, self.focus_active_table) # Check header visibility again after a short delay self.set_timer(1.0, self.check_header_visibility) def scan_directory(self) -> None: """Scan the directory and update the tables incrementally.""" # Reset state before starting new scan self.largest_files = [] self.largest_dirs = [] # Start timing with monotonic clock self.start_time = time.monotonic() # Notify user that scan is starting self.notify("Starting directory scan...", timeout=2) # Reset sort tracking self._files_sorted = False self._dirs_sorted = False # Show loading indicator loading = self.query_one("#scan-progress") loading.styles.display = "block" # Start async scan with optimized worker function self.scan_task = self.run_worker( self._scan_directory_worker(), name="Directory Scanner", description="Scanning directory...", ) async def _scan_directory_worker(self): """Worker function to process async generator from scan_async with optimized UI updates.""" # Track when we last updated the UI last_ui_update = 0 base_ui_update_interval = 0.5 # Get UI elements once timer_display = self.query_one("#scan-timer") count_display = self.query_one("#scan-count") # Create independent timer task async def update_timer(): start = time.monotonic() while True: elapsed = time.monotonic() - start minutes, seconds = divmod(int(elapsed), 60) timer_display.update(f"Time: {minutes:02d}:{seconds:02d}") await asyncio.sleep(0.05) # Update 20 times per second for smooth display # Start timer task and store reference self._timer_task = asyncio.create_task(update_timer()) # Buffers to collect data between UI updates files_buffer = [] dirs_buffer = [] last_file_count = 0 # Initialize progress with default values in case of early exception progress = None current_time = time.monotonic() try: async for progress in self.scanner.scan_async(self.path): if not progress: continue # Update our data in memory if progress.files: files_buffer = progress.files if progress.dirs: dirs_buffer = progress.dirs # Update file count independently count_display.update(f"Files: {progress.scanned:,}") # Dynamically adjust update interval based on files scanned ui_update_interval = base_ui_update_interval if progress.scanned > 100000: ui_update_interval = 5.0 elif progress.scanned > 50000: ui_update_interval = 3.0 elif progress.scanned > 10000: ui_update_interval = 2.0 elif progress.scanned > 5000: ui_update_interval = 1.0 # Check if it's time to update tables current_time = time.monotonic() if current_time - last_ui_update > ui_update_interval: self.largest_files = files_buffer self.largest_dirs = dirs_buffer self.apply_sort(self.sort_method) self.update_tables() last_ui_update = current_time last_file_count = progress.scanned await asyncio.sleep(0) except Exception as e: self.notify(f"Scan error: {str(e)}", severity="error") raise finally: # Always clean up the timer task if hasattr(self, "_timer_task"): self._timer_task.cancel() try: await self._timer_task except asyncio.CancelledError: pass # Dynamically adjust update interval based on files scanned ui_update_interval = base_ui_update_interval # Only process progress data if we have a valid progress object if progress is not None: if progress.scanned > 100000: ui_update_interval = 5.0 # Very infrequent updates for huge directories elif progress.scanned > 50000: ui_update_interval = 3.0 # Very infrequent updates for very large directories elif progress.scanned > 10000: ui_update_interval = 2.0 # Less frequent updates for large directories elif progress.scanned > 5000: ui_update_interval = 1.0 # Moderate updates for medium directories # Force an update if we've scanned a lot more files since the last update # This ensures we show progress even during long update intervals force_update = progress.scanned - last_file_count > 5000 # Use adaptive interval between UI updates time_to_update = current_time - last_ui_update > ui_update_interval # Only update UI periodically, on completion, or when forced if time_to_update or progress.progress >= 1.0 or force_update: # Update our data self.largest_files = files_buffer self.largest_dirs = dirs_buffer # Apply sort and update tables self.apply_sort(self.sort_method) self.update_tables() last_ui_update = current_time last_file_count = progress.scanned # Brief yield to allow UI to update, but keep it minimal await asyncio.sleep(0) # Return final data return { "files": self.largest_files, "dirs": self.largest_dirs, "total_size": self.scanner._total_size, "file_count": self.scanner._file_count, } async def on_worker_state_changed(self, event: Worker.StateChanged) -> None: """Handle updates from the background scan task with optimized UI updates.""" if event.worker.name != "Directory Scanner": return # Get loading indicator loading = self.query_one("#scan-progress") if event.worker.state == WorkerState.SUCCESS: # Hide loading indicator loading.styles.display = "none" # Get result data from worker file_count = 0 if event.worker.result: result = event.worker.result file_count = result.get("file_count", 0) # Only update UI if we have new data if "files" in result and result["files"]: self.largest_files = result["files"] self._files_sorted = False if "dirs" in result and result["dirs"]: self.largest_dirs = result["dirs"] self._dirs_sorted = False # Get elapsed time for notification elapsed = time.monotonic() - self.start_time # Update final file count count_display = self.query_one("#scan-count") count_display.update(f"Files: {file_count:,}") # Show completion notification self.notify(f"Scan complete in {elapsed:.1f}s. Found {file_count:,} files.", timeout=5) # Clean up timer task if hasattr(self, "_timer_task"): self._timer_task.cancel() try: await self._timer_task except asyncio.CancelledError: pass # Apply sort and update tables only once at the end self.apply_sort(self.sort_method) self.update_tables() # focus the active table self.focus_active_table() elif event.worker.state == WorkerState.ERROR: # Hide loading indicator loading.styles.display = "none" self.notify("Scan failed!", severity="error") # Track last table update to avoid redundant updates _last_table_update = {} _last_table_items = {} def update_tables(self) -> None: """Update both data tables with current data, avoiding redundant updates.""" # Update files table if data has changed self._update_table_if_changed("#files-table", self.largest_files) # Update dirs table if data has changed self._update_table_if_changed("#dirs-table", self.largest_dirs) def _update_table_if_changed(self, table_id: str, items: List[FileInfo]) -> None: """Update a table only if its data has changed significantly. Args: table_id: CSS selector for the table items: List of FileInfo objects to display """ # Skip update if no items if not items: return # Check if data has changed significantly current_items = self._last_table_items.get(table_id, []) # If item count is the same, check if top items are the same if len(current_items) == len(items): # Only check the first few items for performance check_count = min(5, len(items)) items_changed = False for i in range(check_count): if ( i >= len(current_items) or items[i].path != current_items[i].path or items[i].size != current_items[i].size ): items_changed = True break if not items_changed: # Data hasn't changed significantly, skip update return # Update last items self._last_table_items[table_id] = items # Now update the table self._update_table(table_id, items) def _update_table(self, table_id: str, items: List[FileInfo]) -> None: """Helper method to update a specific table with items. Args: table_id: CSS selector for the table items: List of FileInfo objects to display """ table = self.query_one(table_id) table.clear() table.can_focus = True # Skip update if no items if not items: return # Limit the number of items to display for better performance display_items = items[: min(100, len(items))] # Render all items at once - Textual's DataTable has built-in virtualization for item_info in display_items: self._add_row_to_table(table, item_info) def _add_row_to_table(self, table, item_info: FileInfo) -> None: """Add a single row to a table. Args: table: The DataTable to add the row to item_info: FileInfo object with data for the row """ try: rel_path = item_info.path.relative_to(self.path) except ValueError: rel_path = item_info.path storage_status = "☁️ iCloud" if item_info.is_icloud else "💾 Local" storage_cell = Text(storage_status, style="#268bd2" if item_info.is_icloud else "#859900") table.add_row( format_size(item_info.size), storage_cell, str(rel_path), key=str(item_info.path) ) # Track current sort state to avoid redundant sorts _current_sort_method = "sort-size" _files_sorted = False _dirs_sorted = False def apply_sort(self, sort_method: str) -> None: """Apply the selected sort method to the data, avoiding redundant sorts.""" # Skip if no data to sort if not self.largest_files and not self.largest_dirs: return # Skip if sort method hasn't changed and data is already sorted if sort_method == self._current_sort_method and self._files_sorted and self._dirs_sorted: return # Define sort keys based on method sort_keys = { "sort-size": lambda x: -x.size, # Negative for descending order "sort-name": lambda x: x.path.name.lower(), "sort-path": lambda x: str(x.path).lower(), } # Get the appropriate sort key function key_func = sort_keys.get(sort_method) if not key_func: return # Invalid sort method # Only sort if we have data and sort method has changed if self.largest_files: self.largest_files.sort(key=key_func) self._files_sorted = True if self.largest_dirs: self.largest_dirs.sort(key=key_func) self._dirs_sorted = True # Update current sort method self._current_sort_method = sort_method def action_focus_files(self) -> None: """Focus the files table.""" self.current_focus = "files" self.focus_active_table() def action_focus_dirs(self) -> None: """Focus the directories table.""" self.current_focus = "dirs" self.focus_active_table() def action_toggle_focus(self) -> None: """Toggle focus between files and directories tables.""" self.current_focus = "dirs" if self.current_focus == "files" else "files" self.focus_active_table() def action_sort(self) -> None: """Show the sort options dialog.""" def handle_sort_result(sort_option: Optional[str]) -> None: if sort_option: self.sort_method = sort_option self.apply_sort(sort_option) self.update_tables() self.focus_active_table() self.push_screen(SortOptions(), handle_sort_result) def action_refresh(self) -> None: """Refresh the directory scan.""" self.scan_directory() def action_delete_selected(self) -> None: """Delete the selected file or directory.""" # Get the current table based on the focus table = self.query_one("#files-table" if self.current_focus == "files" else "#dirs-table") # Check if a row is selected if table.cursor_coordinate is not None: row = table.cursor_coordinate.row if row < len(table.rows): # Get the path from the row key # Get row data (unused but kept for potential future use) table.get_row_at(row) # In the current version of Textual, we need to access the key differently # The key is stored when we add the row, so we need to look it up in our data if self.current_focus == "files" and row < len(self.largest_files): path = self.largest_files[row].path elif self.current_focus == "dirs" and row < len(self.largest_dirs): path = self.largest_dirs[row].path else: self.notify("Could not determine the path for this item", timeout=5) return is_dir = path.is_dir() # Show confirmation dialog def handle_confirmation(confirmed: bool) -> None: if confirmed: try: if is_dir: shutil.rmtree(path) else: os.remove(path) self.notify(f"Successfully deleted {path}", timeout=5) except Exception as e: self.notify(f"Error deleting {path}: {e}", timeout=5) self.push_screen(ConfirmationDialog(path, is_dir), handle_confirmation) def action_help(self) -> None: """Show help information.""" help_text = """ [#93a1a1]Reclaimed Help[/] [#268bd2]Navigation:[/] - Arrow keys: Navigate within a table - F: Focus Files table - D: Focus Directories table - Tab: Move between tables [#268bd2]Actions:[/] - Delete: Delete selected item - S: Sort items - R: Refresh scan - Q: Quit application [#268bd2]Selection:[/] - Click on a row to select it - Press Delete to remove the selected item """ self.notify(help_text, timeout=10) # Tab button handlers removed as we now have a unified view def on_data_table_row_selected(self, event) -> None: """Handle row selection in data tables.""" table_id = event.data_table.id row = event.cursor_coordinate.row # Update current_focus based on which table was selected if table_id == "files-table": items = self.largest_files self.current_focus = "files" else: items = self.largest_dirs self.current_focus = "dirs" if 0 <= row < len(items): path = items[row].path self.notify(f"Selected: {path}", timeout=3) def check_header_visibility(self) -> None: """Check header visibility after a delay.""" try: # Debug header visibility dirs_header = self.query_one("#dirs-section-header") files_header = self.query_one("#files-section-header") print(f"DEBUG: dirs_header visible: {dirs_header.styles.display}") print(f"DEBUG: files_header visible: {files_header.styles.display}") print(f"DEBUG: dirs_header text: {dirs_header.render()}") print(f"DEBUG: files_header text: {files_header.render()}") # Check the DOM order all_widgets = list(self.query("Static")) print("DEBUG: Widget order in DOM:") for i, widget in enumerate(all_widgets): print(f"DEBUG: {i}: {widget.id} - {widget.render()}") except Exception as e: print(f"DEBUG: Error checking headers: {e}") def focus_active_table(self) -> None: """Focus the currently active table based on current_focus.""" table_id = "#files-table" if self.current_focus == "files" else "#dirs-table" table = self.query_one(table_id) # Only set focus if the table has rows if len(table.rows) > 0: self.set_focus(table) # Set cursor to first row if no row is selected if table.cursor_coordinate is None: table.move_cursor(row=0, column=0) def on_unmount(self) -> None: """Event handler called when app is unmounted.""" if self.on_exit_callback: self.on_exit_callback() def run_textual_ui( path: Path, max_files: int = 100, max_dirs: int = 100, skip_dirs: list[str] = None ) -> None: """Run the Textual UI application. Args: path: Directory to scan max_files: Maximum number of files to show max_dirs: Maximum number of directories to show skip_dirs: List of directory names to skip """ if skip_dirs is None: skip_dirs = [".Trash", "System Volume Information"] options = ScanOptions(max_files=max_files, max_dirs=max_dirs, skip_dirs=skip_dirs) app = ReclaimedApp(path, options) app.run()
- <think>Okay, I need to implement high-quality logging with an optional debug mode for the provided Python script. Let me start by understanding the code structure and where logging is necessary.
- First, the script uses the Textual framework for building a UI, and there are several classes like ReclaimedApp, ConfirmationDialog, SortOptions, and ProgressManager. The main function is run_textual_ui which initializes and runs the app. The existing code has some print statements in the check_header_visibility method which are likely for debugging, but they should be replaced with proper logging.
- The user wants optional debug mode, so I should create a way to enable or disable debug logging. Usually, this can be done via command-line arguments or an environment variable. Since the entry point is the run_textual_ui function, maybe adding a debug parameter there would be best. Alternatively, the ReclaimedApp could take a debug flag, but I'll see.
- I need to import the logging module. Let me structure the logging so that there's a logger instance in the app. The logger should have handlers configured to output to console, with debug level when enabled. The existing print statements in check_header_visibility, like the ones starting with "DEBUG:", should be turned into logger.debug calls.
- Also, in the _scan_directory_worker, there are some try-except blocks and finally clauses. Adding error logging there would help catch issues during scanning. The on_worker_state_changed could log the success or failure of the worker. The delete operation in action_delete_selected could log if there's an exception when removing files or directories. The help method might not need logging, but other methods like scan_directory, action_sort, etc., should have appropriate log messages.
- Let me think about the steps:
- At the top of the script, import logging and set up the logger. The logger should have a name, maybe "reclaimed.textual_app".
- Add a logging configuration function. This function would set the logging level to DEBUG if debug mode is enabled, else to INFO or WARNING. The output should go to the console with a nice format.
- Modify the run_textual_ui function to accept a debug parameter. Then, when creating ReclaimedApp, pass the debug flag, or have the ReclaimedApp initialize the logger with that setting.
- Wait, the ReclaimedApp's init already takes options and on_exit_callback. Maybe add a debug parameter to its init so that the app can set up the logger accordingly.
- Alternatively, configuration can be done once when the script starts, so maybe in run_textual_ui, we can set up logging based on the debug flag passed. That might be better because the main function is where the app is launched, so the debug mode can be enabled there via an argument.
- But the user might want the debug mode to be optional when running the script. So adding a debug flag to the run_textual_ui function's parameters would make sense. Let's adjust the run_textual_ui signature to include debug: bool = False.
- Then, inside run_textual_ui, set up the logging with that parameter. For example:
- def run_textual_ui(path: Path, ..., debug: bool = False):
- options = ...
- logger = logging.getLogger("reclaimed.textual_app")
- configure_logging(debug) # Or setup here directly
- app = ReclaimedApp(...)
- app.run()
- Wait, perhaps better to have a helper function to configure logging. Let me create a configure_logging function that takes a debug flag.
- Also, in the ReclaimedApp, methods like scan_directory, action_delete_selected, etc., can log messages. Let's see existing print statements:
- In check_header_visibility, there are several prints. Replace those with logger.debug calls.
- Additionally, in the except blocks, like the one in the finally clause of _scan_directory_worker, we can log exceptions. Similarly, in the action_delete_selected method, the except block can log the error.
- The ProgressManager class may also need logging, but since it's a helper class inside the app, maybe the logger from the app can be used there. However, since it's a separate class, maybe we should have it use the same logger. Let's check how the ProgressManager is initialized: it takes app as a parameter. So the ProgressManager can access app.logger to get the logger instance. Alternatively, maybe pass the logger to the ProgressManager's init, but perhaps better to use the app's logger.
- Wait, maybe the ProgressManager should have its own logger. Hmm, but the ProgressManager is part of the app's internal structure, so maybe it's okay to use the app's logger. Let me think that the ReclaimedApp will have a logger attribute, which is initialized in its init. Then, ProgressManager can just use self.app.logger.
- Alternatively, perhaps the ProgressManager doesn't need to log anything. Let's see what the original code does. The ProgressManager has init but no logging in there. So maybe the logging is mainly in the app's methods.
- Now, setting up the logger for the app:
- In init of ReclaimedApp, after calling super().init(), set up the logger. Wait, but the logger is typically configured once at the top. Maybe the configure_logging function should be called in run_textual_ui, and then the app can use that logger. Wait, but Textual apps run in their own context, perhaps it's better to set the logger within the app's initialization.
- Alternatively, use a module-level logger. Let me check.
- The current code starts with:
- import asyncio import os import shutil import time from pathlib import Path from typing import Callable, List, Optional
- So adding logging would be:
- import logging
- Then, create a logger for the module:
- logger = logging.getLogger("reclaimed.textual_app")
- Wait, but maybe each class can have its own logger with class names. Alternatively, have one logger for the whole module. Since the user wants this for the textual_app.py, a module-level logger would be appropriate.
- Wait, but in the ReclaimedApp methods, we can just refer to the module's logger, like logger.debug("message"). However, if the app is an instance, perhaps having an instance logger is better, but probably not necessary. Let me see.
- In the code provided, the existing print statements are inside methods of ReclaimedApp. So replacing those with logger.debug would be better. But to enable debug mode, the logger's level should be set to DEBUG when needed.
- So, the steps:
- Add the logging module.
- Create a module-level logger, perhaps with the name "reclaimed.textual_app".
- Add a function to configure the logger based on a debug flag passed to run_textual_ui.
- In run_textual_ui, accept debug as a parameter and call configure_logging(debug).
- Replace all print statements with logger.debug, info, error, etc., as appropriate.
- Add logging statements in key places like when starting scans, completing scans, errors during scan, deletions, etc.
- Now, let's go through each part of the code to see where to add logs.
- Starting with the ProgressManager class. The init could log an info message when initialized.
- Then, in the ReclaimedApp init:
- When initializing, log that the app is starting with the given path, options, etc.
- In scan_directory():
- The notify and reset state should be logged as info, like "Starting directory scan...", "Resetting state before scan"
- The finally block: if there was an error, log the exception.
- In _scan_directory_worker():
- The try block: log when entering, any progress steps, and also when exceptions occur.
- The except clause catches exceptions and shows a notify. So log the exception as error here.
- The finally clause: also log the completion or final result.
- In on_worker_state_changed:
- For WorkerState.SUCCESS, log the completion time and file count.
- For WorkerState.ERROR, log that the scan failed.
- In action_delete_selected:
- When the item is selected, log the action (info). When deletion starts, maybe debug. When an error occurs during deletion, log that as error.
- Also, the handle_confirmation function is a nested function, so need to make sure it uses the app's logger or pass it.
- Wait, in the action_delete_selected method, the handle_confirmation function is defined inside. Since it's a closure, it can access the logger from the app instance. Alternatively, perhaps better to log at the app level.
- But in the current code, the handle_confirmation function is called when the confirmation dialog is dismissed, so inside that function, after the user confirms, we can log the deletion attempt.
- In the ConfirmationDialog's methods, maybe not needed unless there's some state to log, but probably not necessary unless there are errors in the dialog itself.
- The SortOptions dialog might have some logs when applied or canceled, but again, unless there's an error, maybe just info messages for sorting.
- Additionally, the existing check_header_visibility has debug prints. Since this is a debug helper, those should be moved to logger.debug.
- The help method's notify can be turned into an info log.
- Also, in the ProgressManager class, perhaps add some debug messages tracking progress state, but since it's a helper, maybe not necessary unless it's involved in critical logic.
- The ProgressManager's current methods don't have any code, but maybe in the future, so perhaps just initializing it logs info.
- Now, let's structure the configuration.
- The configure_logging function can set the logger's level, format, and handlers.
- The standard practice is to have a basicConfig in the module, but perhaps better to setup a handler here to have more control.
- Wait, in Python, it's better to avoid configuring logging at the module level with basicConfig, because if the module is imported elsewhere, it might interfere. Instead, have the configure_logging function add a console handler, set the level, and format.
- So:
- def configure_logging(debug: bool):
- logger = logging.getLogger("reclaimed.textual_app")
- logger.setLevel(logging.DEBUG if debug else logging.INFO)
- markdown
- Copy
- handler = logging.StreamHandler()
- handler.setLevel(logging.DEBUG if debug else logging.INFO)
- formatter = logging.Formatter(
- fmt="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- Wait, but the first time configure_logging is called, adding the handler each time would cause duplicate messages. To avoid that, perhaps check if the logger already has handlers before adding.
- Alternatively, the configure_logging function can be called once, and each time it removes existing handlers and resets, but that might not be needed. The user is expecting logging in this script, so if they are running the app standalone, maybe it's okay. But to be safe, check existing handlers and only add if needed.
- Hmm, but in the current code, the ReclaimedApp is an App that runs the Textual event loop. So perhaps the logging configuration is best done before starting the app, so in run_textual_ui, after the imports.
- Wait, the run_textual_ui is the entry point, so it would be best to configure logging there based on the debug flag.
- Wait, in the provided code, the run_textual_ui is the function that's called to start the app. So the configure_logging can be called inside run_textual_ui, based on the debug parameter which is passed to it.
- So first, adjust the run_textual_ui parameters:
- def run_textual_ui(
- path: Path,
- max_files: int = 100,
- max_dirs: int = 100,
- skip_dirs: list[str] = None,
- debug: bool = False,
- ) -> None:
- Then, in the beginning of run_textual_ui:
- configure_logging(debug)
- That way, the debug flag enables the debug logs.
- Now, define the configure_logging function.
- Wait, but I should also ensure that the logger's name is correct. Let me create a logger for the module.
- Wait, the module is called textual_app, so the logger should be named "reclaimed.textual_app" as per PEP 3101.
- So in the module:
- logger = logging.getLogger("reclaimed.textual_app")
- def configure_logging(debug: bool):
- # Check if handlers already exist to avoid duplicates
- if not logger.handlers:
- handler = logging.StreamHandler()
- formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- markdown
- Copy
- # Set the logger's level
- logger.setLevel(logging.DEBUG if debug else logging.INFO)
- Wait, but the handler's level is also set. Maybe it's better to set handler's level to the same as the logger's, but actually, the logger's level is the main one. The handler's level can be left at its default (NOTSET) so that it logs all messages from the logger's level. So perhaps only set the logger's level, and the handler's level can be omitted. Because the handler's default is to log everything from the logger's level. Wait, no, the handler has its own level. So perhaps better to set handler.setLevel to NOTSET, so that it passes all messages from the logger. Or set the handler level same as the logger.
- Alternatively, the handler's level can be set to DEBUG, but the logger's level will control the filtering. Let me make sure that the handler's level doesn't restrict the messages.
- Wait, the handler's level is the lowest level it will accept. So, if the logger is set to DEBUG and the handler to INFO, then debug messages won't be logged. So to be safe, set the handler's level to DEBUG, and the logger's level to the desired level (DEBUG or INFO). Or set both to the same level.
- Hmm, perhaps it's sufficient to set the logger's level, since the handler's level will default to NOTSET (which doesn't filter). Wait, no, the default for a StreamHandler's level is NOTSET, which means the handler's level doesn't affect the filtering. Only the logger's effective level matters. Wait, actually, the effective level is determined by both the logger and the handler. So if the handler has a level higher than the logger, it can still suppress messages. To avoid that, perhaps set the handler's level to the same as the desired level.
- Alternatively, set the handler's level to the lowest possible (DEBUG), so that the handler will pass all messages, but the logger's level determines which messages to emit. So that's better. Because the handler can filter, but in this case, if the user wants to see debug messages, then the logger's level is set to DEBUG, and the handler is set to DEBUG. Otherwise, the logger is set to INFO, and so on.
- Wait, perhaps the best way is:
- def configure_logging(debug: bool):
- logger.setLevel(logging.DEBUG)
- handler = logging.StreamHandler()
- handler.setLevel(logging.DEBUG)
- formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- markdown
- Copy
- if not debug:
- logger.setLevel(logging.INFO)
- Wait, but this might be better. Set the logger to DEBUG, handler to DEBUG, then if not debug, change the logger's level to INFO. Alternatively, set the logger's level to the appropriate value.
- Wait, perhaps:
- def configure_logging(debug: bool):
- # Get the root logger? Or the module's logger?
- # We already have the module's logger as logger (instance named 'logger' earlier)
- # So check if it's configured.
- if not logger.handlers:
- handler = logging.StreamHandler()
- handler.setLevel(logging.DEBUG) # Handler will accept all messages
- formatter = logging.Formatter(
- "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- markdown
- Copy
- # Set the logger to appropriate level
- logger.setLevel(logging.DEBUG if debug else logging.INFO)
- This way, the first time configure_logging is called, it adds the console handler with DEBUG level, then sets the logger level to either DEBUG or INFO. So messages below that level are not emitted.
- Now, replacing the existing print statements in check_header_visibility with logger.debug:
- Original code inside check_header_visibility:
- print(f"DEBUG: dirs_header visible: {dirs_header.styles.display}")
- print(f"DEBUG: files_header visible: {files_header.styles.display}")
- print(f"DEBUG: dirs_header text: {dirs_header.render()}")
- print(f"DEBUG: files_header text: {files_header.render()}")
- These can become:
- logger.debug(f"dirs_header visible: {dirs_header.styles.display}")
- logger.debug(f"files_header visible: {files_header.styles.display}")
- logger.debug(f"dirs_header text: {dirs_header.render()}")
- logger.debug(f"files_header text: {files_header.render()}")
- Wait, but the print statements start with "DEBUG:", which is redundant if using logger.debug. The log level is already debug, so the message will be logged with the level.
- So just the messages without the prefix. Also, in the except block of check_header_visibility, the print statement can be:
- logger.exception("Error checking headers")
- Wait, but the original code is:
- except Exception as e:
- print(f"DEBUG: Error checking headers: {e}")
- So maybe logger.error(f"Error checking headers: {e}").
- Hmm. Also, the _add_row_to_table has a try-except for relative_to. That could log a warning or error if the path can't be made relative.
- In _add_row_to_table:
- except ValueError:
- rel_path = item_info.path
- Perhaps log a warning here: e.g., logger.warning(f"Path {item_info.path} is not relative to {self.path}; using absolute path.")
- Now, thinking about which other places to log.
- In init of ReclaimedApp:
- Log when initializing the app, including the path and options.
- So:
- def init(
- self, path: Path, options: ScanOptions, on_exit_callback: Optional[Callable] = None
- ):
- super().init()
- self.logger = logger # assuming the module's logger is accessible here
- self.path = path.resolve()
- self.options = options
- self.on_exit_callback = on_exit_callback
- self.scanner = DiskScanner(options)
- self.largest_files = []
- self.largest_dirs = []
- self.current_focus = "files"
- self.sort_method = "sort-size"
- self.progress_manager = None
- self.logger.info(f"Initialized ReclaimedApp for path {self.path} with options {options}")
- Wait, since ReclaimedApp is an instance, perhaps each instance can have its own logger? Or better to use the module's logger. In this case, since the module's logger is already set up, the app can just use that, but in init we can log an info message.
- Yes, in ReclaimedApp's init, log an info message about initialization.
- Now, in scan_directory() method:
- The lines that set self.largest_files and dirs to [] could be logged as info.
- self.logger.info("Starting scan_directory; resetting files and directories lists")
- The notify("Starting directory scan...", ...) can be logged as info:
- self.logger.info("Starting directory scan...")
- The try-except-finally in the _scan_directory_worker:
- Inside the try block:
- async for progress in self.scanner.scan_async(self.path):
- if not progress:
- continue
- self.logger.debug(f"Received progress update: files={len(progress.files)}, dirs={len(progress.dirs)}, scanned={progress.scanned}")
- In the except block:
- except Exception as e:
- self.logger.error(f"Scan error occurred: {e}", exc_info=True)
- self.notify(f"Scan error: {str(e)}", severity="error")
- raise
- The finally block:
- if progress is not None:
- # ... updating tables. Maybe add a debug log here.
- Finally, after waiting for the timer task:
- self.logger.debug("Worker task completed, cleaning up timer")
- The return statement at the end of the worker can log the final result:
- self.logger.info("Scan completed successfully. Results: files %d, directories %d, total size %d bytes, file count %d", len(self.largest_files), len(self.largest_dirs), progress.total_size, progress.file_count)
- Wait, but in the finally section of _scan_directory_worker, there is a return statement with the result. So in the finally block, after processing, log the final result.
- Wait in the finally block:
- Return final data
- return {
- "files": self.largest_files,
- "dirs": self.largest_dirs,
- "total_size": self.scanner._total_size,
- "file_count": self.scanner._file_count,
- }
- I can log the return there:
- self.logger.info("Scan completed with final data: files=%d, dirs=%d, total_size=%d, file_count=%d", len(self.largest_files), len(self.largest_dirs), self.scanner._total_size, self.scanner._file_count)
- Also, in the on_worker_state_changed method:
- if event.worker.state == WorkerState.SUCCESS:
- markdown
- Copy
- self.logger.info("Scan task completed successfully. Took %.1f seconds. Found %d files.", elapsed, file_count)
- elif event.worker.state == WorkerState.ERROR:
- markdown
- Copy
- self.logger.error("Scan task failed.", exc_info=True)
- In the action_delete_selected:
- When selecting the path:
- if confirmed:
- logger.info(f"Deleting {path} (directory: {is_dir})")
- try:
- if is_dir:
- shutil.rmtree(path)
- else:
- os.remove(path)
- self.logger.info(f"Successfully deleted {path}")
- except Exception as e:
- self.logger.error(f"Error deleting {path}: {str(e)}", exc_info=True)
- self.notify(f"Error deleting {path}: {e}", timeout=5)
- Wait, but in the handle_confirmation function, which is a nested function, how do we access the app's logger?
- Wait, handle_confirmation is a function defined inside action_delete_selected. Since it's a closure, it can access the self of the app.
- Yes, because the function is defined inside the method, so self is accessible.
- Wait, the app is ReclaimedApp instance, so in handle_confirmation:
- def handle_confirmation(confirmed: bool) -> None:
- if confirmed:
- try:
- if is_dir:
- shutil.rmtree(path)
- else:
- os.remove(path)
- self.logger.info(f"Successfully deleted {path}") # Here, self refers to the outer ReclaimedApp instance
- except Exception as e:
- self.logger.error(f"Error deleting {path}: {str(e)}", exc_info=True)
- self.notify(...)
- Wait, but the function is nested inside the method, so the variables from the outer scope are available, including self. So yes, that should work.
- So, in action_delete_selected's handle_confirmation function, use self.logger.
- Now, the help function:
- In action_help, the notify is called with help_text. Maybe log that the help was shown.
- self.logger.info("User accessed help menu")
- But since the help text is detailed, maybe just log an info message about the help being displayed.
- Additionally, in the on_data_table_row_selected event, logging the selection. Maybe a debug log when a row is selected.
- def on_data_table_row_selected(self, event) -> None:
- ...
- self.logger.debug(f"Row selected in table {table_id}: path={path}")
- Also, in the focus_active_table method, log when focus is set.
- def focus_active_table(self) -> None:
- table_id = "#files-table" if self.current_focus == "files" else "#dirs-table"
- table = self.query_one(table_id)
- self.logger.debug(f"Setting focus to {table_id}")
- # Only set focus if the table has rows
- if len(table.rows) > 0:
- self.set_focus(table)
- # Set cursor to first row if not selected
- if table.cursor_coordinate is None:
- table.move_cursor(row=0, column=0)
- self.logger.debug(f"Set cursor to first row in {table_id}")
- Now, the ProgressManager class. Since it's initialized in on_mount, perhaps log when created.
- In ReclaimedApp's on_mount:
- self.progress_manager = ProgressManager(...)
- self.logger.debug("ProgressManager initialized for container main-container")
- Also, in ProgressManager's init:
- def init(self, app: App, container_id: str):
- super().init()
- self.logger = app.logger # Assuming app has a logger attribute. Wait, the ReclaimedApp has a logger, but in the init, we set self.logger = logger (the module logger). Alternatively, if the ProgressManager is given the app, then it can access app.logger. So yes.
- Wait, ReclaimedApp's init does not have a self.logger yet. Wait, in the ReclaimedApp's init as I suggested earlier, set self.logger = logger (the module's logger). But actually, the module's logger is already available via the module's name. So maybe ReclaimedApp can just use the module's logger, but in the init we might want to log something, so perhaps ReclaimedApp's init can log to the module's logger. Alternatively, each instance can have its own logger, but that's more complicated. The simplest is to use the module's logger.
- Wait, the module's logger is at the top, so in ReclaimedApp's init, I can access that logger via logging.getLogger("reclaimed.textual_app"), but perhaps it's better to inject it. Hmm, but maybe it's easier to use the module's logger directly. Let me see:
- Wait, in the code I added earlier, the module has a logger variable which is the logger for "reclaimed.textual_app". So inside ReclaimedApp's init, self can refer to that logger. Wait no, the ReclaimedApp's instance doesn't have a reference unless we set it. So in init:
- self.logger = logger
- So in the module:
- logger = logging.getLogger("reclaimed.textual_app")
- Then, in ReclaimedApp's init, set self.logger = logger. Wait, but in each ReclaimedApp instance, this would point to the same logger. That's okay, since the logger is module-scoped. But that's the same as just using logger directly. Hmm, perhaps it's better to use the module's logger directly in all methods. Although, perhaps using self.logger to have clearer code. Let me proceed with that.
- Wait, the module's logger is accessible as logger, so in ReclaimedApp, we can either use that variable, or in init set self.logger = logging.getLogger("reclaimed.textual_app").
- Alternatively, the ProgressManager should use the app's logger, so in its init, self.app.logger.
- Yes, perhaps better to have ProgressManager use the app's logger.
- So in ProgressManager's init:
- logger = self.app.logger
- Wait, but in the ProgressManager's init, app is passed, so self.app is the ReclaimedApp instance. So yes, the ProgressManager can use self.app.logger for logging.
- So in ProgressManager's init:
- def init(self, app: App, container_id: str):
- super().init()
- self.app = app
- self.container_id = container_id
- self.last_update_time = 0
- self.update_interval = 0.1
- self.last_progress_value = 0
- self.min_progress_increment = 0.005
- self.app.logger.debug(f"ProgressManager initialized for container {container_id}")
- Wait, but the ProgressManager doesn't have much functionality yet. Maybe no logs here. Let me see the code again. The ProgressManager is a class that's initialized in on_mount, but in the existing code, it's just a container with some variables. Unless there are methods in ProgressManager that need logging, but in the current code, ProgressManager doesn't have any methods. Wait, looking back at the code, the ProgressManager class is just a placeholder, but in the script provided, it's there but not used. Wait, the ProgressManager is mentioned in the ReclaimedApp's init but in the current code, it's initialized, but no methods are called. Wait, maybe the user expects that in the future, the ProgressManager will manage progress bars. But in the current code, the loading indicator is in the footer, so maybe the ProgressManager isn't actually used yet. However, the user's code includes it, so perhaps the intention is to use it later. But since the code as given doesn't use it, maybe I can leave logs in ProgressManager for when it's implemented, but for now, just add the debug log in init.
- Anyway, moving forward.
- Now, let's go through the entire script and convert all print statements to logging.debug statements, and add other log messages where appropriate.
- Also, the existing exception in the finally clause of _scan_directory_worker is not being caught. Wait, inside the finally block, when trying to await the cancelled task, the except clause catches async's CancelledError, which is okay, but the code raises that error? Or the worker is failing due to the exception in the try block.
- Wait, in _scan_directory_worker, the try block has except and finally clauses. The except clause is for the generator's exceptions, and the finally will be called regardless. But in the finally, after the try-except, there's a return statement. Wait, the code inside the finally block in the worker function:
- The code inside finally is:
- try:
- await self._timer_task
- except asyncio.CancelledError:
- pass
- but self._timer_task was created with asyncio.create_task. So when the worker is being cancelled, the task is cancelled, and that's handled here. However, the error in the try block (if any) will have been raised again after the except clause.
- Now, in the code, the existing ProgressManager isn't used, so perhaps that's a placeholder. The user might expect logs for it, but since it's not used yet, maybe just add a log in its init.
- Now, let's think about the main steps:
- Add module-level logger.
- Create a configure_logging function that is called in run_textual_ui based on debug flag.
- Replace all print statements with logger.debug.
- Add info logs at the start of scan, end, errors.
- Add error logs for exceptions during deletion.
- Also, log when the app unmounts, so in on_unmount, log that, and if there's an on_exit_callback, log that it's being called.
- Now, let me check the existing code again for other possible places. For example, in the action_sort method:
- def action_sort(self) -> None:
- def handle_sort_result(sort_option: Optional[str]) -> None:
- if sort_option:
- self.sort_method = sort_option
- self.apply_sort(sort_option)
- self.update_tables()
- self.focus_active_table()
- self.push_screen(SortOptions(), handle_sort_result)
- So when the user opens the sort dialog, maybe log that. When the result is applied, log the new sort method.
- In handle_sort_result:
- self.logger.info(f"Applying sort method: {sort_option}")
- Also, in action_refresh, log that the user is refreshing.
- self.logger.info("User requested to refresh the scan")
- Now, the existing notifications can also be logged as info. For example, in scan_directory:
- self.notify("Starting directory scan...", timeout=2)
- So log that as:
- self.logger.info("Starting directory scan...")
- Same for others.
- Now, let's structure all these changes step by step.
- First, adding the logger and configure_logging function at the top:
- At the top of the script (after imports):
- import logging
- logger = logging.getLogger("reclaimed.textual_app")
- def configure_logging(debug: bool):
- # Check if no handlers are present
- if not logger.handlers:
- # Create a console handler and set level to debug
- handler = logging.StreamHandler()
- handler.setLevel(logging.DEBUG)
- markdown
- Copy
- # Create formatter
- formatter = logging.Formatter(
- fmt="%(asctime)s [%(levelname)s] %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- # Add formatter to handler
- handler.setFormatter(formatter)
- # Add handler to the logger
- logger.addHandler(handler)
- # Set the logger's level
- logger.setLevel(logging.DEBUG if debug else logging.INFO)
- Wait, but the handler's level is set to DEBUG, so it will accept all messages. The logger's effective level is set to the desired level (DEBUG or INFO). So messages below that level won't be emitted.
- This way, when debug is True, the logger's level is DEBUG, so all debug, info, warn, etc., messages will be shown. When debug is False, only info and above.
- Now, in run_textual_ui, after the imports and before creating the app, call configure_logging.
- def run_textual_ui(
- path: Path,
- max_files: int = 100,
- max_dirs: int = 100,
- skip_dirs: list[str] = None,
- debug: bool = False,
- ) -> None:
- if skip_dirs is None:
- skip_dirs = [".Trash", "System Volume Information"]
- markdown
- Copy
- configure_logging(debug)
- options = ScanOptions(max_files=max_files, max_dirs=max_dirs, skip_dirs=skip_dirs)
- app = ReclaimedApp(path, options)
- app.run()
- But wait, in the ReclaimedApp init:
- def init(
- self, path: Path, options: ScanOptions, on_exit_callback: Optional[Callable] = None
- ):
- super().init()
- self.logger = logger # So that the class can use the logger
- # ... existing code
- Wait, but that's okay, but actually, since logger is a module-level variable, each app instance can access it via the module. Alternatively, better to have the app use its own logger, but using the module level is fine.
- Wait, but the init of the app can also initialize its own logger, perhaps with a more specific name. But since the module's logger is already set up, maybe it's redundant. Alternatively, the ReclaimedApp can have its own logger instance. Let me think that using the module's logger is sufficient, as all the logs are from the same module.
- So in ReclaimedApp's init, just set self.logger = logger so that all instance methods can use self.logger.
- Wait, but in the init method, the self.logger = logger (the module's logger). So that's okay.
- Now, replacing the existing print statements in check_header_visibility:
- Original:
- print(f"DEBUG: dirs_header visible: {dirs_header.styles.display}")
- print(f"DEBUG: files_header visible: {files_header.styles.display}")
- print(f"DEBUG: dirs_header text: {dirs_header.render()}")
- print(f"DEBUG: files_header text: {files_header.render()}")
- Change to:
- self.logger.debug(f"dirs_header visible: {dirs_header.styles.display}")
- self.logger.debug(f"files_header visible: {files_header.styles.display}")
- self.logger.debug(f"dirs_header text: {dirs_header.render()}")
- self.logger.debug(f"files_header text: {files_header.render()}")
- Also, the except block:
- except Exception as e:
- self.logger.error(f"Error checking headers: {e}")
- Now, in the _add_row_to_table method:
- def _add_row_to_table(self, table, item_info: FileInfo) -> None:
- try:
- rel_path = item_info.path.relative_to(self.path)
- except ValueError:
- rel_path = item_info.path
- self.logger.warning(f"Path {item_info.path} is not under scanned directory {self.path}; using absolute path.")
- markdown
- Copy
- storage_status = "☁️ iCloud" if item_info.is_icloud else "💾 Local"
- storage_cell = Text(storage_status, style="#268bd2" if item_info.is_icloud else "#859900")
- table.add_row(
- format_size(item_info.size),
- storage_cell,
- str(rel_path),
- key=str(item_info.path)
- )
- Wait, the original code's except ValueError block in _add_row_to_table didn't log anything, now adding a warning.
- Now, in scan_directory():
- def scan_directory(self) -> None:
- self.logger.info("Starting scan_directory; resetting files and directories lists")
- self.largest_files = []
- self.largest_dirs = []
- markdown
- Copy
- self.start_time = time.monotonic()
- self.logger.info("Starting directory scan...")
- loading = self.query_one("#scan-progress")
- loading.styles.display = "block"
- self.logger.debug("Showing progress indicator")
- self.scan_task = self.run_worker(
- self._scan_directory_worker(),
- name="Directory Scanner",
- description="Scanning directory...",
- )
- self.logger.debug("Scan worker started")
- Wait, the existing notify is replaced with logger.info. Also, adding a debug log about starting the worker.
- In the _scan_directory_worker:
- async def _scan_directory_worker(self):
- # ... existing code
- markdown
- Copy
- last_ui_update = 0
- base_ui_update_interval = 0.5
- self.logger.debug("Starting scan worker")
- # async def update_timer() inside:
- async def update_timer():
- start = time.monotonic()
- while True:
- elapsed = time.monotonic() - start
- minutes, seconds = divmod(int(elapsed), 60)
- timer_display.update(f"Time: {minutes:02d}:{seconds:02d}")
- await asyncio.sleep(0.05)
- self._timer_task = asyncio.create_task(update_timer())
- self.logger.debug("Started timer task")
- try:
- async for progress in self.scanner.scan_async(self.path):
- if not progress:
- continue
- self.logger.debug(f"Received progress update: scanned {progress.scanned} items, files buffer {len(files_buffer)}, dirs buffer {len(dirs_buffer)}")
- # ... existing code
- except Exception as e:
- self.logger.error("Error during scan: %s", e, exc_info=True)
- self.notify(f"Scan error: {str(e)}", severity="error")
- raise
- finally:
- # ... existing code
- # Return final data
- result = {
- "files": self.largest_files,
- "dirs": self.largest_dirs,
- "total_size": self.scanner._total_size,
- "file_count": self.scanner._file_count,
- }
- self.logger.info("Scan completed. Final results: %d files, %d directories, total size %d bytes",
- len(result["files"]), len(result["dirs"]), result["total_size"])
- return result
- In the finally block, after the try-except, log the final result.
- Now, in on_worker_state_changed for WorkerState.SUCCESS:
- async def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
- if event.worker.name != "Directory Scanner":
- return
- markdown
- Copy
- loading = self.query_one("#scan-progress")
- if event.worker.state == WorkerState.SUCCESS:
- loading.styles.display = "none"
- self.logger.info("Scan task finished successfully")
- if event.worker.result:
- result = event.worker.result
- file_count = result.get("file_count", 0)
- elapsed = time.monotonic() - self.start_time
- self.logger.info("Scan completed in %.1f seconds. Total files scanned: %d", elapsed, file_count)
- # Update final count
- count_display.update(f"Files: {file_count:,}")
- # Show notification
- self.notify(f"Scan complete in {elapsed:.1f}s. Found {file_count:,} files.", timeout=5)
- elif event.worker.state == WorkerState.ERROR:
- loading.styles.display = "none"
- self.logger.error("Scan task failed with an error")
- self.notify("Scan failed!", severity="error")
- Also, in the case of error, log that.
- In the action_delete_selected:
- def action_delete_selected(self) -> None:
- table = self.query_one("#files-table" if self.current_focus == "files" else "#dirs-table")
- if table.cursor_coordinate is not None:
- row = table.cursor_coordinate.row
- if row < len(table.rows):
- if self.current_focus == "files":
- path = self.largest_files[row].path
- elif self.current_focus == "dirs":
- path = self.largest_dirs[row].path
- else:
- self.logger.error("Unexpected current_focus value: %s", self.current_focus)
- return
- markdown
- Copy
- is_dir = path.is_dir()
- self.logger.debug(f"Selected item for deletion: {path} (directory: {is_dir})")
- # ... push screen with confirmation dialog.
- def handle_confirmation(confirmed: bool) -> None:
- if confirmed:
- try:
- if is_dir:
- shutil.rmtree(path)
- else:
- os.remove(path)
- self.logger.info(f"Successfully deleted {path}")
- self.notify(f"Successfully deleted {path}", timeout=5)
- except Exception as e:
- self.logger.error(f"Error deleting {path}: {str(e)}", exc_info=True)
- self.notify(f"Error deleting {path}: {e}", timeout=5)
- else:
- self.logger.debug("Deletion canceled by user")
- self.push_screen(ConfirmationDialog(path, is_dir), handle_confirmation)
- Wait, in the handle_confirmation function, the self refers to the ReclaimedApp instance, so the logger can be used.
- Wait, but in the closure, variables from the outer scope (like self) are accessible. So yes, that's okay.
- Also, if the current_focus is unexpected, log an error.
- Now, in action_sort:
- def action_sort(self) -> None:
- self.logger.debug("Opening sort dialog")
- markdown
- Copy
- def handle_sort_result(sort_option: Optional[str]) -> None:
- if sort_option:
- self.sort_method = sort_option
- self.logger.info(f"Sort method changed to {sort_option}")
- self.apply_sort(sort_option)
- self.update_tables()
- self.focus_active_table()
- else:
- self.logger.debug("Sort dialog canceled by user")
- self.push_screen(SortOptions(), handle_sort_result)
- In action_refresh:
- def action_refresh(self) -> None:
- self.logger.info("User initiated refresh; restarting scan")
- self.scan_directory()
- action_help:
- def action_help(self) -> None:
- self.logger.debug("User accessed help menu")
- help_text = """
- [HELP TEXT HERE]
- """
- self.notify(help_text, timeout=10)
- In on_mount:
- def on_mount(self) -> None:
- self.logger.debug("App mounted")
- self.progress_manager = ProgressManager(self, "main-container") # which would log in its init? Let's see.
- Wait, in the ProgressManager's init(), I can add:
- def init(self, app: App, container_id: str):
- super().init()
- self.app = app
- self.container_id = container_id
- self.logger = app.logger # To get the logger from the App instance
- self.last_update_time = 0
- self.update_interval = 0.1
- self.last_progress_value = 0
- self.min_progress_increment = 0.005
- self.logger.debug(f"ProgressManager initialized for container '{container_id}'")
- So now, ProgressManager has its own logger, which is the same as the app's.
- Now, in the ReclaimedApp's on_unmount:
- def on_unmount(self) -> None:
- self.logger.info("Application unmounting")
- if self.on_exit_callback:
- self.logger.debug("Calling on_exit_callback")
- self.on_exit_callback()
- Now, check all methods.
- Also, the init of ReclaimedApp:
- def init(
- self, path: Path, options: ScanOptions, on_exit_callback: Optional[Callable] = None
- ):
- super().init()
- self.logger = logger # module's logger
- self.path = path.resolve()
- self.options = options
- self.on_exit_callback = on_exit_callback
- self.scanner = DiskScanner(options)
- self.largest_files = []
- self.largest_dirs = []
- self.current_focus = "files" # Tracks which table has focus
- self.sort_method = "sort-size" # Default sort method
- self.progress_manager = None
- self.logger.info(f"Initialized ReclaimedApp for path {self.path} with options {options}")
- Wait, but the options might have sensitive data? Probably not, so logging the options is okay.
- Now, the compose functions for the screens (ConfirmationDialog and SortOptions). Do they need logging? Probably only if an error occurs, but the existing code doesn't have any except blocks there.
- In ConfirmationDialog's init:
- def init(self, item_path: Path, is_dir: bool = False):
- super().init()
- self.item_path = item_path
- self.is_dir = is_dir
- self.item_type = "directory" if is_dir else "file"
- self.logger.debug(f"ConfirmationDialog initialized for {self.item_type} at {item_path}")
- Wait, but the ConfirmationDialog is a ModalScreen instance. Since it's part of Textual's Screens, does it have a way to access the app's logger? Or perhaps use the module's logger.
- Alternatively, perhaps the ModalScreen classes can get the logger from the app when they are pushed. But in their init, they don't have access to the App. So maybe better to use the module's logger for them as well.
- Wait, but the module's logger is available as a module variable, so inside the ConfirmationDialog and SortOptions, they can just use the logger variable from the module.
- Wait, but to avoid confusion, perhaps it's better to have them use the module's logger.
- Wait, in the code, the module has a logger variable, so the ConfirmationDialog can do:
- from the parent module, but since they are in the same file, they can access logger.
- Wait, in the module, the logger is available as the variable logger, so inside ConfirmationDialog's init, add:
- logger.debug(f"ConfirmationDialog initialized for {self.item_type} at {self.item_path}")
- But that's okay, but need to make sure that the logger is properly configured when these classes are used.
- Alternatively, perhaps for consistency, when the dialog is pushed, but maybe the logger is already set up in run_textual_ui.
- Yes, since configure_logging is called when run_textual_ui is executed, the logger is set up before any instances are created, so the ConfirmationDialog can use the module's logger directly.
- Same for SortOptions.
- So in SortOptions's compose:
- def compose(self) -> ComposeResult:
- logger.debug("Starting SortOptions dialog")
- # ... existing code.
- But maybe adding a log in the init or compose of SortOptions:
- def init(self):
- super().init()
- logger.debug("SortOptions dialog initialized")
- But not sure. The main logs are probably sufficient.
- Now, in the ProgressManager class, since it's not being used yet, but it's part of the code, maybe add logs there when it's actually used. But as per the current code, the ProgressManager is only initialized, but no methods called. So perhaps that's okay.
- Wait, looking at the code, the existing ProgressManager is not being used, but the user might have intended to use it. Since the problem says to implement logging with optional debug mode, but the ProgressManager code might be a placeholder. But I should at least log its initialization.
- Now, let me go through all the code, replacing print statements with logs and adding log statements where appropriate.
- In the _update_table_if_changed function:
- def _update_table_if_changed(self, table_id: str, items: List[FileInfo]) -> None:
- if not items:
- return
- current_items = self._last_table_items.get(table_id, [])
- check_count = min(5, len(items))
- items_changed = False
- markdown
- Copy
- for i in range(check_count):
- if (i >= len(current_items) or items[i].path != current_items[i].path or items[i].size != current_items[i].size):
- items_changed = True
- break
- if not items_changed:
- return
- self._last_table_items[table_id] = items
- self.logger.debug(f"Updating table {table_id} with new items")
- In the _update_table:
- def _update_table(self, table_id: str, items: List[FileInfo]) -> None:
- table = self.query_one(table_id)
- table.clear()
- self.logger.debug(f"Clearing table {table_id}")
- markdown
- Copy
- display_items = items[: min(100, len(items))]
- self.logger.debug(f"Updating table {table_id} with {len(display_items)} items")
- for item_info in display_items:
- self._add_row_to_table(table, item_info)
- Now, the debug logs track when the tables are being updated and cleared.
- Also, in the apply_sort method:
- def apply_sort(self, sort_method: str) -> None:
- if not self.largest_files and not self.largest_dirs:
- return
- markdown
- Copy
- if sort_method == self._current_sort_method and self._files_sorted and self._dirs_sorted:
- return
- self.logger.debug(f"Applying sort method {sort_method}")
- # ... rest of the code.
- Now, all the key points have logging statements.
- Testing edge cases: when debug is True, the log messages will include debug, info, etc. When False, only info and above (warning, error, critical) will be shown.
- The user may also want to log file deletions, which is covered in action_delete_selected's handle_confirmation.
- Also, in the worker's finally clause, if progress is not None, log the final results.
- Wait, in the finally block of _scan_directory_worker:
- if progress is not None:
- # code to process and update tables
- # and then after:
- self.logger.info("Scan finished with final data: %d files, %d directories",
- len(self.largest_files), len(self.largest_dirs))
- Wait, but in the code, after the loop, there's the return statement. So perhaps in the finally block, before returning:
- if progress is not None:
- # ... code to update tables
- self.logger.info("Scan completed after worker task")
- Wait, perhaps better to log in the finally regardless, but need to check.
- Alternatively, in the finally block after the try, before the return, log the final data.
- So:
- Finally:
- # ... code
- result = ...
- self.logger.info("Scan completed. Files: %d, Directories: %d, Total Size: %d, File Count: %d",
- len(result["files"]), len(result["dirs"]), result["total_size"], result["file_count"])
- return result
- This would give a clear log when the scan finishes.
- Now, the main part. But the user might not have one here, as the script is a module. However, the run_textual_ui is the entry point.
- Putting it all together, the final code would have the logger and configure_logging added, all print statements removed and replaced with debug logs, and info/warning/error logs added where appropriate.
- I should also check that the logger is named correctly. The user's original code may have other modules, so using "reclaimed.textual_app" is appropriate.
- Testing if the logs are properly configured. The configure_logging is called once in run_textual_ui with the debug flag, which sets the right level.
- Now, putting all these changes into the script.
- </think>
- To implement high-quality logging with an optional debug mode in the provided script, we need to add proper logging capabilities and replace existing debug print statements with logging calls. Here's the modified code with these changes:
- python
- Copy
- """Textual UI for reclaimed with interactive file/folder management."""
- import asyncio
- import os
- import shutil
- import time
- from pathlib import Path
- from typing import Callable, List, Optional
- from rich.text import Text
- from textual import on
- from textual.app import App, ComposeResult
- from textual.binding import Binding
- from textual.containers import Container
- from textual.screen import ModalScreen
- from textual.widgets import (
- Button,
- DataTable,
- Footer,
- Header,
- LoadingIndicator,
- RadioButton,
- RadioSet,
- Static,
- )
- from textual.worker import Worker, WorkerState
- from ..core import DiskScanner, FileInfo, ScanOptions
- from ..utils.formatters import format_size
- from .styles import TEXTUAL_CSS
- import logging
- logger = logging.getLogger("reclaimed.textual_app")
- def configure_logging(debug: bool):
- """Configures the logging for the application."""
- if not logger.handlers:
- handler = logging.StreamHandler()
- handler.setLevel(logging.DEBUG)
- formatter = logging.Formatter(
- fmt="%(asctime)s [%(levelname)s] %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S"
- )
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(logging.DEBUG if debug else logging.INFO)
- class ProgressManager:
- """Manages progress bar lifecycle to prevent duplicate IDs and provide smoother updates."""
- def __init__(self, app: App, container_id: str):
- super().__init__()
- self.app = app
- self.container_id = container_id
- self.last_update_time = 0
- self.update_interval = 0.1 # Update at most 10 times per second
- self.last_progress_value = 0
- self.min_progress_increment = 0.005 # Minimum 0.5% change to update
- logger.debug(f"ProgressManager initialized for container '{container_id}'")
- class ConfirmationDialog(ModalScreen):
- """A modal dialog for confirming file/folder deletion."""
- def __init__(self, item_path: Path, is_dir: bool = False):
- super().__init__()
- self.item_path = item_path
- self.is_dir = is_dir
- self.item_type = "directory" if is_dir else "file"
- logger.debug(f"ConfirmationDialog initialized for {self.item_type} at {item_path}")
- def compose(self) -> ComposeResult:
- """Compose the confirmation dialog."""
- logger.debug("Composing ConfirmationDialog")
- with Container(id="dialog-container"):
- yield Static("[bold]Confirmation Required[/bold]", id="dialog-title")
- yield Static(f"Are you sure you want to delete this {self.item_type}?", id="dialog-question")
- yield Static(f"[bold red]{self.item_path}[/]", id="dialog-path")
- if self.is_dir:
- yield Static("[yellow]Warning: This will delete all contents recursively![/]")
- with Horizontal(id="dialog-buttons"):
- yield Button("Cancel", variant="primary", id="cancel-button")
- yield Button("Delete", variant="error", id="confirm-button")
- @on(Button.Pressed, "#cancel-button")
- def cancel_deletion(self) -> None:
- """Cancel the deletion operation."""
- logger.debug("Deletion canceled by user")
- self.dismiss(False)
- @on(Button.Pressed, "#confirm-button")
- def confirm_deletion(self) -> None:
- """Confirm the deletion operation."""
- logger.debug(f"Deletion confirmed for {self.item_path}")
- self.dismiss(True)
- class SortOptions(ModalScreen):
- """A modal dialog for selecting sort options."""
- def __init__(self):
- super().__init__()
- logger.debug("SortOptions dialog initialized")
- def compose(self) -> ComposeResult:
- """Compose the sort options dialog."""
- logger.debug("Composing SortOptions")
- with Container(id="sort-container"):
- yield Static("[bold]Sort Options[/bold]", id="sort-title")
- yield Static("Sort by:", id="sort-label")
- with RadioSet(id="sort-options"):
- yield RadioButton("Size (largest first)", id="sort-size", value=True)
- yield RadioButton("Name (A-Z)", id="sort-name")
- yield RadioButton("Path (A-Z)", id="sort-path")
- with Horizontal(id="sort-buttons"):
- yield Button("Cancel", variant="primary", id="sort-cancel")
- yield Button("Apply", variant="success", id="sort-apply")
- @on(Button.Pressed, "#sort-cancel")
- def cancel_sort(self) -> None:
- """Cancel the sort operation."""
- logger.debug("Sort dialog canceled by user")
- self.dismiss(None)
- @on(Button.Pressed, "#sort-apply")
- def apply_sort(self) -> None:
- """Apply the selected sort option."""
- sort_option = self.query_one("#sort-options").pressed_button.id
- logger.info(f"Sort method selected: {sort_option}")
- self.dismiss(sort_option)
- class ReclaimedApp(App):
- """Textual app for reclaimed with interactive file management."""
- CSS = TEXTUAL_CSS
- BINDINGS = [
- Binding("q", "quit", "Quit"),
- Binding("f", "focus_files", "Focus Files"),
- Binding("d", "focus_dirs", "Focus Directories"),
- Binding("tab", "toggle_focus", "Toggle Focus"),
- Binding("s", "sort", "Sort"),
- Binding("r", "refresh", "Refresh"),
- Binding("delete", "delete_selected", "Delete"),
- Binding("?", "help", "Help"),
- ]
- def __init__(
- self,
- path: Path,
- options: ScanOptions,
- on_exit_callback: Optional[Callable] = None,
- ):
- super().__init__()
- self.logger = logger
- self.path = path.resolve()
- self.options = options
- self.on_exit_callback = on_exit_callback
- self.scanner = DiskScanner(options)
- self.largest_files: List[FileInfo] = []
- self.largest_dirs: List[FileInfo] = []
- self.current_focus = "files" # Tracks which table has focus
- self.sort_method = "sort-size" # Default sort method
- self.progress_manager = None # Will be initialized after mount
- self.logger.info(f"Initialized ReclaimedApp for path {self.path} with options {options}")
- def compose(self) -> ComposeResult:
- """Compose the app layout."""
- self.logger.debug("Composing application layout")
- yield Header(show_clock=True)
- yield Static("[bold]Reclaimed[/bold]", id="title")
- with Container(id="main-container"):
- # Status bar with scan info
- with Horizontal(id="status-bar"):
- yield Static("Path:", id="status-label")
- yield Static(f"{self.path}", id="path-display")
- yield Static("", id="scan-timer")
- yield Static("", id="scan-count")
- # Directories section
- yield Static("[bold]Largest Directories[/bold]", id="dirs-section-header")
- dirs_table = DataTable(id="dirs-table")
- dirs_table.add_columns("Size", "Storage", "Path")
- yield dirs_table
- # Files section
- yield Static("[bold]Largest Files[/bold]", id="files-section-header")
- files_table = DataTable(id="files-table")
- files_table.add_columns("Size", "Storage", "Path")
- yield files_table
- with Horizontal(id="footer-container"):
- yield Footer()
- yield LoadingIndicator(id="scan-progress")
- def on_mount(self) -> None:
- """Event handler called when the app is mounted."""
- self.logger.info("App mounted")
- self.progress_manager = ProgressManager(self, "main-container")
- self.scan_directory()
- self.set_timer(0.1, self.focus_active_table)
- self.set_timer(1.0, self.check_header_visibility)
- def scan_directory(self) -> None:
- """Scan the directory and update the tables incrementally."""
- self.logger.info("Starting directory scan; resetting internal data")
- self.largest_files = []
- self.largest_dirs = []
- self.start_time = time.monotonic()
- self.logger.info("Directory scan initiated at %s", time.strftime("%Y-%m-%d %H:%M:%S"))
- loading = self.query_one("#scan-progress")
- loading.styles.display = "block"
- self.logger.debug("Scan progress indicator activated")
- self.scan_task = self.run_worker(
- self._scan_directory_worker(),
- name="Directory Scanner",
- description="Scanning directory...",
- )
- async def _scan_directory_worker(self):
- """Worker function to process async generator from scan_async with optimized UI updates."""
- self.logger.debug("Scan worker started")
- last_ui_update = 0
- base_ui_update_interval = 0.5
- files_buffer = []
- dirs_buffer = []
- last_file_count = 0
- timer_display = self.query_one("#scan-timer")
- count_display = self.query_one("#scan-count")
- async def update_timer():
- start = self.start_time
- while True:
- elapsed = time.monotonic() - start
- minutes, seconds = divmod(int(elapsed), 60)
- timer_display.update(f"Time: {minutes:02d}:{seconds:02d}")
- await asyncio.sleep(0.05)
- self._timer_task = asyncio.create_task(update_timer())
- self.logger.debug("Timer task launched")
- try:
- async for progress in self.scanner.scan_async(self.path):
- if not progress:
- continue
- if progress.files:
- files_buffer = progress.files
- if progress.dirs:
- dirs_buffer = progress.dirs
- count_display.update(f"Files: {progress.scanned:,}")
- ui_update_interval = base_ui_update_interval
- if progress.scanned > 100000:
- ui_update_interval = 5.0
- elif progress.scanned > 50000:
- ui_update_interval = 3.0
- elif progress.scanned > 10000:
- ui_update_interval = 2.0
- elif progress.scanned > 5000:
- ui_update_interval = 1.0
- current_time = time.monotonic()
- force_update = progress.scanned - last_file_count > 5000
- time_to_update = current_time - last_ui_update > ui_update_interval
- if time_to_update or progress.progress >= 1.0 or force_update:
- self.largest_files = files_buffer
- self.largest_dirs = dirs_buffer
- self.apply_sort(self.sort_method)
- self.update_tables()
- last_ui_update = current_time
- last_file_count = progress.scanned
- self.logger.debug("UI updated with latest scan progress")
- await asyncio.sleep(0)
- except Exception as e:
- self.logger.error("Scan error: %s", e, exc_info=True)
- self.notify(f"Scan error: {str(e)}", severity="error")
- raise
- finally:
- # Clean up timer task
- if hasattr(self, "_timer_task"):
- self._timer_task.cancel()
- try:
- await self._timer_task
- except asyncio.CancelledError:
- pass
- if progress is not None:
- result = {
- "files": self.largest_files,
- "dirs": self.largest_dirs,
- "total_size": self.scanner._total_size,
- "file_count": self.scanner._file_count,
- }
- self.logger.info(
- "Scan completed. Final results: %d files, %d directories, total size %d bytes",
- len(result["files"]),
- len(result["dirs"]),
- result["total_size"],
- )
- return result
- async def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
- """Handle updates from the background scan task with optimized UI updates."""
- if event.worker.name != "Directory Scanner":
- return
- loading = self.query_one("#scan-progress")
- if event.worker.state == WorkerState.SUCCESS:
- loading.styles.display = "none"
- self.logger.info("Scan task finished successfully")
- if event.worker.result:
- result = event.worker.result
- file_count = result.get("file_count", 0)
- elapsed = time.monotonic() - self.start_time
- self.logger.info(
- "Scan completed in %.1f seconds. Found %d files.",
- elapsed,
- file_count,
- )
- count_display = self.query_one("#scan-count")
- count_display.update(f"Files: {file_count:,}")
- self.apply_sort(self.sort_method)
- self.update_tables()
- self.focus_active_table()
- # Clean up timer task
- if hasattr(self, "_timer_task"):
- self._timer_task.cancel()
- try:
- await self._timer_task
- except asyncio.CancelledError:
- pass
- elif event.worker.state == WorkerState.ERROR:
- loading.styles.display = "none"
- self.logger.error("Scan task failed with an error")
- self.notify("Scan failed!", severity="error")
- _last_table_items = {}
- def update_tables(self) -> None:
- """Update both data tables with current data, avoiding redundant updates."""
- self._update_table_if_changed("#files-table", self.largest_files)
- self._update_table_if_changed("#dirs-table", self.largest_dirs)
- def _update_table_if_changed(
- self, table_id: str, items: List[FileInfo]
- ) -> None:
- """Update a table only if its data has changed significantly."""
- if not items:
- return
- current_items = self._last_table_items.get(table_id, [])
- if len(current_items) == len(items):
- check_count = min(5, len(items))
- items_changed = False
- for i in range(check_count):
- if (
- i >= len(current_items)
- or items[i].path != current_items[i].path
- or items[i].size != current_items[i].size
- ):
- items_changed = True
- break
- if not items_changed:
- return
- self._last_table_items[table_id] = items
- self._update_table(table_id, items)
- def _update_table(self, table_id: str, items: List[FileInfo]) -> None:
- """Helper method to update a specific table with items."""
- table = self.query_one(table_id)
- table.clear()
- self.logger.debug(f"Clearing table {table_id}")
- display_items = items[: min(100, len(items))]
- self.logger.debug(f"Updating {table_id} with {len(display_items)} items")
- for item_info in display_items:
- self._add_row_to_table(table, item_info)
- def _add_row_to_table(
- self, table, item_info: FileInfo
- ) -> None:
- """Add a single row to a table."""
- try:
- rel_path = item_info.path.relative_to(self.path)
- except ValueError:
- rel_path = item_info.path
- self.logger.warning(
- f"Path {item_info.path} is not under the scanned directory; using absolute path."
- )
- storage_status = "☁️ iCloud" if item_info.is_icloud else "💾 Local"
- storage_cell = Text(
- storage_status,
- style="#268bd2" if item_info.is_icloud else "#859900",
- )
- table.add_row(
- format_size(item_info.size),
- storage_cell,
- str(rel_path),
- key=str(item_info.path),
- )
- _current_sort_method = "sort-size"
- _files_sorted = False
- _dirs_sorted = False
- def apply_sort(self, sort_method: str) -> None:
- """Apply the selected sort method to the data, avoiding redundant sorts."""
- if not self.largest_files and not self.largest_dirs:
- return
- if (
- sort_method == self._current_sort_method
- and self._files_sorted
- and self._dirs_sorted
- ):
- self.logger.debug("Sort method unchanged; skipping sort")
- return
- sort_keys = {
- "sort-size": lambda x: -x.size,
- "sort-name": lambda x: x.path.name.lower(),
- "sort-path": lambda x: str(x.path).lower(),
- }
- key_func = sort_keys.get(sort_method)
- if key_func:
- if self.largest_files:
- self.largest_files.sort(key=key_func)
- self._files_sorted = True
- if self.largest_dirs:
- self.largest_dirs.sort(key=key_func)
- self._dirs_sorted = True
- self._current_sort_method = sort_method
- self.logger.debug(f"Applied sort method: {sort_method}")
- else:
- self.logger.warning(f"Invalid sort method: {sort_method}")
- def action_focus_files(self) -> None:
- """Focus the files table."""
- self.current_focus = "files"
- self.logger.debug("Focus set to files table")
- self.focus_active_table()
- def action_focus_dirs(self) -> None:
- """Focus the directories table."""
- self.current_focus = "dirs"
- self.logger.debug("Focus set to directories table")
- self.focus_active_table()
- def action_toggle_focus(self) -> None:
- """Toggle focus between files and directories tables."""
- self.current_focus = "dirs" if self.current_focus == "files" else "files"
- self.logger.debug(f"Focus toggled to {self.current_focus}")
- self.focus_active_table()
- def action_sort(self) -> None:
- """Show the sort options dialog."""
- self.logger.debug("Opening sort options dialog")
- def handle_sort_result(sort_option: Optional[str]) -> None:
- if sort_option:
- self.sort_method = sort_option
- self.apply_sort(sort_option)
- self.update_tables()
- self.focus_active_table()
- self.logger.info(f"Sort method applied: {sort_option}")
- else:
- self.logger.debug("Sort dialog result was None")
- self.push_screen(SortOptions(), handle_sort_result)
- def action_refresh(self) -> None:
- """Refresh the directory scan."""
- self.logger.info("User initiated refresh; restarting scan")
- self.scan_directory()
- def action_delete_selected(self) -> None:
- """Delete the selected file or directory."""
- self.logger.debug("Deleting selected item")
- table_id = "#files-table" if self.current_focus == "files" else "#dirs-table"
- table = self.query_one(table_id)
- if table.cursor_coordinate is not None:
- row = table.cursor_coordinate.row
- if row < len(table.rows):
- try:
- if self.current_focus == "files":
- path = self.largest_files[row].path
- elif self.current_focus == "dirs":
- path = self.largest_dirs[row].path
- else:
- self.logger.error(
- "Unexpected current_focus value: %s", self.current_focus
- )
- return
- is_dir = path.is_dir()
- self.logger.debug(f"SelectedItem: path={path}, is_dir={is_dir}")
- def handle_confirmation(confirmed: bool) -> None:
- if confirmed:
- try:
- if is_dir:
- shutil.rmtree(path)
- else:
- os.remove(path)
- self.logger.info(f"Successfully deleted {path}")
- self.notify(
- f"Successfully deleted {path}", timeout=5
- )
- except Exception as e:
- self.logger.error(
- f"Error deleting {path}: {str(e)}", exc_info=True
- )
- self.notify(
- f"Error deleting {path}: {e}", timeout=5
- )
- else:
- self.logger.debug("Deletion canceled by user")
- self.push_screen(
- ConfirmationDialog(path, is_dir), handle_confirmation
- )
- except IndexError:
- self.logger.error("Selected row index out of bounds")
- self.notify("Invalid selection", timeout=5)
- def action_help(self) -> None:
- """Show help information."""
- self.logger.debug("Help menu accessed")
- help_text = """
- [#93a1a1]Reclaimed Help[/]
- [#268bd2]Navigation:[/]
- - Arrow keys: Navigate within a table
- - F: Focus Files table
- - D: Focus Directories table
- - Tab: Move between tables
- [#268bd2]Actions:[/]
- - Delete: Delete selected item
- - S: Sort items
- - R: Refresh scan
- - Q: Quit application
- [#268bd2]Selection:[/]
- - Click on a row to select it
- - Press Delete to remove the selected item
- """
- self.notify(help_text, timeout=10)
- def on_data_table_row_selected(self, event) -> None:
- """Handle row selection in data tables."""
- table_id = event.data_table.id
- row = event.cursor_coordinate.row
- if table_id == "files-table":
- items = self.largest_files
- self.current_focus = "files"
- else:
- items = self.largest_dirs
- self.current_focus = "dirs"
- path = str(items[row].path) if 0 <= row < len(items) else "None"
- self.logger.debug(f"Row selected in {table_id}: row {row}, path={path}")
- self.notify(f"Selected: {path}", timeout=3)
- def check_header_visibility(self) -> None:
- """Check header visibility after a short delay."""
- try:
- dirs_header = self.query_one("#dirs-section-header")
- files_header = self.query_one("#files-section-header")
- self.logger.debug(
- f"dirs_header visible: {dirs_header.styles.display}, "
- f"files_header visible: {files_header.styles.display}"
- )
- self.logger.debug(
- f"dirs_header text: {dirs_header.render()}, "
- f"files_header text: {files_header.render()}"
- )
- all_widgets = list(self.query("Static"))
- self.logger.debug("Current widget order:")
- for i, widget in enumerate(all_widgets):
- self.logger.debug(f"Widget {i}: {widget.id} - {widget.render()}")
- except Exception as e:
- self.logger.error("Error checking headers: %s", str(e), exc_info=True)
- def focus_active_table(self) -> None:
- """Focus the currently active table based on current_focus."""
- table_id = "#files-table" if self.current_focus == "files" else "#dirs-table"
- self.logger.debug(f"Setting focus to {table_id}")
- table = self.query_one(table_id)
- if len(table.rows) > 0:
- self.set_focus(table)
- if table.cursor_coordinate is None:
- table.move_cursor(row=0, column=0)
- self.logger.debug(f"Cursor set to first row in {table_id}")
- def on_unmount(self) -> None:
- """Event handler called when app is unmounted."""
- self.logger.info("Application unmounting")
- if self.on_exit_callback:
- self.logger.debug("Calling on_exit_callback")
- self.on_exit_callback()
- def run_textual_ui(
- path: Path,
- max_files: int = 100,
- max_dirs: int = 100,
- skip_dirs: list[str] = None,
- debug: bool = False,
- ) -> None:
- """Run the Textual UI application.
- Args:
- path: Directory to scan
- max_files: Maximum number of files to show
- max_dirs: Maximum number of directories to show
- skip_dirs: List of directory names to skip
- debug: Enable debug logging
- """
- if skip_dirs is None:
- skip_dirs = [".Trash", "System Volume Information"]
- configure_logging(debug)
- options = ScanOptions(max_files=max_files, max_dirs=max_dirs, skip_dirs=skip_dirs)
- app = ReclaimedApp(path, options)
- app.run()
- Key Changes:
- Logger Initialization: A module-level logger named reclaimed.textual_app is created.
- Optional Debug Mode: The configure_logging function adjusts the logging level based on the debug flag passed to run_textual_ui.
- Comprehensive Logging: Added debug, info, and error logs throughout the application, including:
- Initialization and configuration steps.
- Scan process start, progress, and completion.
- User interactions like focus changes, deletions, and sorting.
- Table updates and UI state checks.
- Replacement of Print Statements: Debug print statements in check_header_visibility are replaced with logger.debug calls.
- Exception Handling: All except blocks now log exceptions with exc_info for detailed traceback.
- Usage:
- To enable debug mode, call run_textual_ui with the debug=True parameter:
- python
- Copy
- run_textual_ui(Path("/path/to/directory"), debug=True)
- This will output debug-level logs to the console. Without the debug flag, only info and higher severity logs are shown.
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement