Advertisement
Guest User

claude

a guest
Mar 5th, 2025
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 31.91 KB | None | 0 0
  1. import asyncio
  2. import logging
  3. import os
  4. import shutil
  5. import time
  6. from pathlib import Path
  7. from typing import Callable, List, Optional
  8.  
  9. from rich.logging import RichHandler
  10. from rich.text import Text
  11. from textual import on
  12. from textual.app import App, ComposeResult
  13. from textual.binding import Binding
  14. from textual.containers import Container, Horizontal
  15. from textual.screen import ModalScreen
  16. from textual.widgets import (
  17. Button,
  18. DataTable,
  19. Footer,
  20. Header,
  21. LoadingIndicator,
  22. RadioButton,
  23. RadioSet,
  24. Static,
  25. )
  26. from textual.worker import Worker, WorkerState
  27.  
  28. from ..core import DiskScanner, FileInfo, ScanOptions
  29. from ..utils.formatters import format_size
  30. from .styles import TEXTUAL_CSS
  31.  
  32.  
  33. # Configure logger
  34. logger = logging.getLogger("reclaimed")
  35.  
  36.  
  37. class ProgressManager:
  38. """Manages progress bar lifecycle to prevent duplicate IDs and provide smoother updates."""
  39.  
  40. def __init__(self, app: App, container_id: str):
  41. """Initialize the progress manager.
  42.  
  43. Args:
  44. app: The parent Textual app
  45. container_id: ID of the container to mount progress bars in
  46. """
  47. self.app = app
  48. self.container_id = container_id
  49. self.last_update_time = 0
  50. self.update_interval = 0.1 # Update at most 10 times per second
  51. self.last_progress_value = 0
  52. self.min_progress_increment = 0.005 # Minimum 0.5% change to update
  53. logger.debug("ProgressManager initialized with container_id: %s", container_id)
  54.  
  55.  
  56. class ConfirmationDialog(ModalScreen):
  57. """A modal dialog for confirming file/folder deletion."""
  58.  
  59. def __init__(self, item_path: Path, is_dir: bool = False):
  60. super().__init__()
  61. self.item_path = item_path
  62. self.is_dir = is_dir
  63. self.item_type = "directory" if is_dir else "file"
  64. logger.debug("ConfirmationDialog created for %s: %s", self.item_type, self.item_path)
  65.  
  66. def compose(self) -> ComposeResult:
  67. """Compose the confirmation dialog."""
  68. with Container(id="dialog-container"):
  69. yield Static(
  70. f"Are you sure you want to delete this {self.item_type}?", id="dialog-title"
  71. )
  72. yield Static(f"[bold red]{self.item_path}[/]", id="dialog-path")
  73. if self.is_dir:
  74. yield Static("[yellow]Warning: This will delete all contents recursively![/]")
  75. with Horizontal(id="dialog-buttons"):
  76. yield Button("Cancel", variant="primary", id="cancel-button")
  77. yield Button("Delete", variant="error", id="confirm-button")
  78.  
  79. @on(Button.Pressed, "#cancel-button")
  80. def cancel_deletion(self) -> None:
  81. """Cancel the deletion operation."""
  82. logger.info("Deletion canceled for: %s", self.item_path)
  83. self.dismiss(False)
  84.  
  85. @on(Button.Pressed, "#confirm-button")
  86. def confirm_deletion(self) -> None:
  87. """Confirm the deletion operation."""
  88. logger.info("Deletion confirmed for: %s", self.item_path)
  89. self.dismiss(True)
  90.  
  91.  
  92. class SortOptions(ModalScreen):
  93. """A modal dialog for selecting sort options."""
  94.  
  95. def compose(self) -> ComposeResult:
  96. """Compose the sort options dialog."""
  97. with Container(id="sort-container"):
  98. yield Static("Sort by:", id="sort-title")
  99. with RadioSet(id="sort-options"):
  100. yield RadioButton("Size (largest first)", id="sort-size", value=True)
  101. yield RadioButton("Name (A-Z)", id="sort-name")
  102. yield RadioButton("Path (A-Z)", id="sort-path")
  103. with Horizontal(id="sort-buttons"):
  104. yield Button("Cancel", variant="primary", id="sort-cancel")
  105. yield Button("Apply", variant="success", id="sort-apply")
  106.  
  107. @on(Button.Pressed, "#sort-cancel")
  108. def cancel_sort(self) -> None:
  109. """Cancel the sort operation."""
  110. logger.debug("Sort operation canceled")
  111. self.dismiss(None)
  112.  
  113. @on(Button.Pressed, "#sort-apply")
  114. def apply_sort(self) -> None:
  115. """Apply the selected sort option."""
  116. sort_option = self.query_one("#sort-options").pressed_button.id
  117. logger.info("Sort option selected: %s", sort_option)
  118. self.dismiss(sort_option)
  119.  
  120.  
  121. class ReclaimedApp(App):
  122. """Textual app for reclaimed with interactive file management."""
  123.  
  124. CSS = TEXTUAL_CSS
  125. BINDINGS = [
  126. Binding("q", "quit", "Quit"),
  127. Binding("f", "focus_files", "Focus Files"),
  128. Binding("d", "focus_dirs", "Focus Directories"),
  129. Binding("tab", "toggle_focus", "Toggle Focus"),
  130. Binding("s", "sort", "Sort"),
  131. Binding("r", "refresh", "Refresh"),
  132. Binding("delete", "delete_selected", "Delete"),
  133. Binding("?", "help", "Help"),
  134. ]
  135.  
  136. def __init__(
  137. self,
  138. path: Path,
  139. options: ScanOptions,
  140. on_exit_callback: Optional[Callable] = None,
  141. debug: bool = False,
  142. ):
  143. """Initialize the app with the path to scan.
  144.  
  145. Args:
  146. path: Directory to scan
  147. options: Scan configuration options
  148. on_exit_callback: Optional callback to run on exit
  149. debug: Enable debug logging
  150. """
  151. super().__init__()
  152. self.path = path.resolve()
  153. self.options = options
  154. self.on_exit_callback = on_exit_callback
  155. self.scanner = DiskScanner(options)
  156. self.largest_files: List[FileInfo] = []
  157. self.largest_dirs: List[FileInfo] = []
  158. self.current_focus = "files" # Tracks which table has focus
  159. self.sort_method = "sort-size" # Default sort method
  160. self.progress_manager = None # Will be initialized after mount
  161.  
  162. # Set debug mode
  163. self.debug = debug
  164. logger.debug("ReclaimedApp initialized with path: %s, debug mode: %s", self.path, self.debug)
  165.  
  166. def compose(self) -> ComposeResult:
  167. """Compose the app layout."""
  168. logger.debug("Composing app layout")
  169. yield Header(show_clock=True)
  170. yield Static("[bold]Reclaimed[/bold]", id="title")
  171. with Container(id="main-container"):
  172. # Status bar with scan info
  173. with Horizontal(id="status-bar"):
  174. yield Static("Path:", id="status-label")
  175. yield Static(f"{self.path}", id="path-display")
  176. yield Static("", id="scan-timer")
  177. yield Static("", id="scan-count")
  178.  
  179. # Directories section
  180. yield Static("[bold]Largest Directories[/bold]", id="dirs-section-header")
  181. dirs_table = DataTable(id="dirs-table")
  182. dirs_table.add_columns("Size", "Storage", "Path")
  183. yield dirs_table
  184.  
  185. # Files section
  186. yield Static("[bold]Largest Files[/bold]", id="files-section-header")
  187. files_table = DataTable(id="files-table")
  188. files_table.add_columns("Size", "Storage", "Path")
  189. yield files_table
  190.  
  191. with Horizontal(id="footer-container"):
  192. yield Footer()
  193. yield LoadingIndicator(id="scan-progress")
  194.  
  195. def on_mount(self) -> None:
  196. """Event handler called when the app is mounted."""
  197. logger.info("App mounted, initializing...")
  198.  
  199. # Initialize progress manager
  200. self.progress_manager = ProgressManager(self, "main-container")
  201.  
  202. # Start the initial scan
  203. self.scan_directory()
  204.  
  205. # Set initial focus to the files table after scan completes
  206. self.set_timer(0.1, self.focus_active_table)
  207.  
  208. # Check header visibility again after a short delay
  209. self.set_timer(1.0, self.check_header_visibility)
  210.  
  211. logger.debug("App mount complete")
  212.  
  213. def scan_directory(self) -> None:
  214. """Scan the directory and update the tables incrementally."""
  215. logger.info("Starting directory scan for: %s", self.path)
  216.  
  217. # Reset state before starting new scan
  218. self.largest_files = []
  219. self.largest_dirs = []
  220.  
  221. # Start timing with monotonic clock
  222. self.start_time = time.monotonic()
  223.  
  224. # Notify user that scan is starting
  225. self.notify("Starting directory scan...", timeout=2)
  226.  
  227. # Reset sort tracking
  228. self._files_sorted = False
  229. self._dirs_sorted = False
  230.  
  231. # Show loading indicator
  232. loading = self.query_one("#scan-progress")
  233. loading.styles.display = "block"
  234.  
  235. # Start async scan with optimized worker function
  236. self.scan_task = self.run_worker(
  237. self._scan_directory_worker(),
  238. name="Directory Scanner",
  239. description="Scanning directory...",
  240. )
  241. logger.debug("Directory scan worker started")
  242.  
  243. async def _scan_directory_worker(self):
  244. """Worker function to process async generator from scan_async with optimized UI updates."""
  245. logger.debug("Scan directory worker function started")
  246.  
  247. # Track when we last updated the UI
  248. last_ui_update = 0
  249. base_ui_update_interval = 0.5
  250.  
  251. # Get UI elements once
  252. timer_display = self.query_one("#scan-timer")
  253. count_display = self.query_one("#scan-count")
  254.  
  255. # Create independent timer task
  256. async def update_timer():
  257. start = time.monotonic()
  258. while True:
  259. elapsed = time.monotonic() - start
  260. minutes, seconds = divmod(int(elapsed), 60)
  261. timer_display.update(f"Time: {minutes:02d}:{seconds:02d}")
  262. await asyncio.sleep(0.05) # Update 20 times per second for smooth display
  263.  
  264. # Start timer task and store reference
  265. self._timer_task = asyncio.create_task(update_timer())
  266.  
  267. # Buffers to collect data between UI updates
  268. files_buffer = []
  269. dirs_buffer = []
  270. last_file_count = 0
  271.  
  272. # Initialize progress with default values in case of early exception
  273. progress = None
  274. current_time = time.monotonic()
  275.  
  276. try:
  277. logger.debug("Beginning async scan process")
  278. async for progress in self.scanner.scan_async(self.path):
  279. if not progress:
  280. continue
  281.  
  282. # Update our data in memory
  283. if progress.files:
  284. files_buffer = progress.files
  285. logger.debug("Received %d files in progress update", len(files_buffer))
  286. if progress.dirs:
  287. dirs_buffer = progress.dirs
  288. logger.debug("Received %d directories in progress update", len(dirs_buffer))
  289.  
  290. # Update file count independently
  291. count_display.update(f"Files: {progress.scanned:,}")
  292.  
  293. # Dynamically adjust update interval based on files scanned
  294. ui_update_interval = base_ui_update_interval
  295. if progress.scanned > 100000:
  296. ui_update_interval = 5.0
  297. elif progress.scanned > 50000:
  298. ui_update_interval = 3.0
  299. elif progress.scanned > 10000:
  300. ui_update_interval = 2.0
  301. elif progress.scanned > 5000:
  302. ui_update_interval = 1.0
  303.  
  304. # Check if it's time to update tables
  305. current_time = time.monotonic()
  306. if current_time - last_ui_update > ui_update_interval:
  307. logger.debug("Updating UI with new data (scanned: %d files)", progress.scanned)
  308. self.largest_files = files_buffer
  309. self.largest_dirs = dirs_buffer
  310. self.apply_sort(self.sort_method)
  311. self.update_tables()
  312. last_ui_update = current_time
  313. last_file_count = progress.scanned
  314.  
  315. await asyncio.sleep(0)
  316.  
  317. except Exception as e:
  318. logger.exception("Scan error: %s", str(e))
  319. self.notify(f"Scan error: {str(e)}", severity="error")
  320. raise
  321.  
  322. finally:
  323. # Always clean up the timer task
  324. if hasattr(self, "_timer_task"):
  325. logger.debug("Cancelling timer task")
  326. self._timer_task.cancel()
  327. try:
  328. await self._timer_task
  329. except asyncio.CancelledError:
  330. pass
  331.  
  332. # Dynamically adjust update interval based on files scanned
  333. ui_update_interval = base_ui_update_interval
  334.  
  335. # Only process progress data if we have a valid progress object
  336. if progress is not None:
  337. if progress.scanned > 100000:
  338. ui_update_interval = 5.0 # Very infrequent updates for huge directories
  339. elif progress.scanned > 50000:
  340. ui_update_interval = 3.0 # Very infrequent updates for very large directories
  341. elif progress.scanned > 10000:
  342. ui_update_interval = 2.0 # Less frequent updates for large directories
  343. elif progress.scanned > 5000:
  344. ui_update_interval = 1.0 # Moderate updates for medium directories
  345.  
  346. # Force an update if we've scanned a lot more files since the last update
  347. # This ensures we show progress even during long update intervals
  348. force_update = progress.scanned - last_file_count > 5000
  349.  
  350. # Use adaptive interval between UI updates
  351. time_to_update = current_time - last_ui_update > ui_update_interval
  352.  
  353. # Only update UI periodically, on completion, or when forced
  354. if time_to_update or progress.progress >= 1.0 or force_update:
  355. logger.debug("Final UI update with %d files, %d dirs",
  356. len(files_buffer), len(dirs_buffer))
  357. # Update our data
  358. self.largest_files = files_buffer
  359. self.largest_dirs = dirs_buffer
  360.  
  361. # Apply sort and update tables
  362. self.apply_sort(self.sort_method)
  363. self.update_tables()
  364. last_ui_update = current_time
  365. last_file_count = progress.scanned
  366.  
  367. # Brief yield to allow UI to update, but keep it minimal
  368. await asyncio.sleep(0)
  369.  
  370. # Return final data
  371. logger.info("Scan completed with %d files, %d dirs",
  372. len(self.largest_files), len(self.largest_dirs))
  373. return {
  374. "files": self.largest_files,
  375. "dirs": self.largest_dirs,
  376. "total_size": self.scanner._total_size,
  377. "file_count": self.scanner._file_count,
  378. }
  379.  
  380. async def on_worker_state_changed(self, event: Worker.StateChanged) -> None:
  381. """Handle updates from the background scan task with optimized UI updates."""
  382. if event.worker.name != "Directory Scanner":
  383. return
  384.  
  385. # Get loading indicator
  386. loading = self.query_one("#scan-progress")
  387.  
  388. if event.worker.state == WorkerState.SUCCESS:
  389. logger.info("Directory scanner completed successfully")
  390. # Hide loading indicator
  391. loading.styles.display = "none"
  392.  
  393. # Get result data from worker
  394. file_count = 0
  395. if event.worker.result:
  396. result = event.worker.result
  397. file_count = result.get("file_count", 0)
  398.  
  399. # Only update UI if we have new data
  400. if "files" in result and result["files"]:
  401. self.largest_files = result["files"]
  402. self._files_sorted = False
  403. if "dirs" in result and result["dirs"]:
  404. self.largest_dirs = result["dirs"]
  405. self._dirs_sorted = False
  406.  
  407. # Get elapsed time for notification
  408. elapsed = time.monotonic() - self.start_time
  409.  
  410. # Update final file count
  411. count_display = self.query_one("#scan-count")
  412. count_display.update(f"Files: {file_count:,}")
  413.  
  414. # Show completion notification
  415. self.notify(f"Scan complete in {elapsed:.1f}s. Found {file_count:,} files.", timeout=5)
  416.  
  417. # Clean up timer task
  418. if hasattr(self, "_timer_task"):
  419. logger.debug("Cancelling timer task after scan completion")
  420. self._timer_task.cancel()
  421. try:
  422. await self._timer_task
  423. except asyncio.CancelledError:
  424. pass
  425.  
  426. # Apply sort and update tables only once at the end
  427. self.apply_sort(self.sort_method)
  428. self.update_tables()
  429.  
  430. # focus the active table
  431. self.focus_active_table()
  432.  
  433. elif event.worker.state == WorkerState.ERROR:
  434. logger.error("Directory scanner failed")
  435. # Hide loading indicator
  436. loading.styles.display = "none"
  437. self.notify("Scan failed!", severity="error")
  438.  
  439. # Track last table update to avoid redundant updates
  440. _last_table_update = {}
  441. _last_table_items = {}
  442.  
  443. def update_tables(self) -> None:
  444. """Update both data tables with current data, avoiding redundant updates."""
  445. logger.debug("Updating tables with current data")
  446. # Update files table if data has changed
  447. self._update_table_if_changed("#files-table", self.largest_files)
  448.  
  449. # Update dirs table if data has changed
  450. self._update_table_if_changed("#dirs-table", self.largest_dirs)
  451.  
  452. def _update_table_if_changed(self, table_id: str, items: List[FileInfo]) -> None:
  453. """Update a table only if its data has changed significantly.
  454.  
  455. Args:
  456. table_id: CSS selector for the table
  457. items: List of FileInfo objects to display
  458. """
  459. # Skip update if no items
  460. if not items:
  461. logger.debug("No items to update for table %s", table_id)
  462. return
  463.  
  464. # Check if data has changed significantly
  465. current_items = self._last_table_items.get(table_id, [])
  466.  
  467. # If item count is the same, check if top items are the same
  468. if len(current_items) == len(items):
  469. # Only check the first few items for performance
  470. check_count = min(5, len(items))
  471. items_changed = False
  472. for i in range(check_count):
  473. if (
  474. i >= len(current_items)
  475. or items[i].path != current_items[i].path
  476. or items[i].size != current_items[i].size
  477. ):
  478. items_changed = True
  479. break
  480.  
  481. if not items_changed:
  482. # Data hasn't changed significantly, skip update
  483. logger.debug("Table %s data unchanged, skipping update", table_id)
  484. return
  485.  
  486. # Update last items
  487. self._last_table_items[table_id] = items
  488.  
  489. # Now update the table
  490. self._update_table(table_id, items)
  491.  
  492. def _update_table(self, table_id: str, items: List[FileInfo]) -> None:
  493. """Helper method to update a specific table with items.
  494.  
  495. Args:
  496. table_id: CSS selector for the table
  497. items: List of FileInfo objects to display
  498. """
  499. logger.debug("Updating table %s with %d items", table_id, len(items))
  500. table = self.query_one(table_id)
  501. table.clear()
  502. table.can_focus = True
  503.  
  504. # Skip update if no items
  505. if not items:
  506. return
  507.  
  508. # Limit the number of items to display for better performance
  509. display_items = items[: min(100, len(items))]
  510.  
  511. # Render all items at once - Textual's DataTable has built-in virtualization
  512. for item_info in display_items:
  513. self._add_row_to_table(table, item_info)
  514.  
  515. def _add_row_to_table(self, table, item_info: FileInfo) -> None:
  516. """Add a single row to a table.
  517.  
  518. Args:
  519. table: The DataTable to add the row to
  520. item_info: FileInfo object with data for the row
  521. """
  522. try:
  523. rel_path = item_info.path.relative_to(self.path)
  524. except ValueError:
  525. rel_path = item_info.path
  526.  
  527. storage_status = "☁️ iCloud" if item_info.is_icloud else "💾 Local"
  528. storage_cell = Text(storage_status, style="#268bd2" if item_info.is_icloud else "#859900")
  529.  
  530. table.add_row(
  531. format_size(item_info.size),
  532. storage_cell,
  533. str(rel_path),
  534. key=str(item_info.path)
  535. )
  536.  
  537. # Track current sort state to avoid redundant sorts
  538. _current_sort_method = "sort-size"
  539. _files_sorted = False
  540. _dirs_sorted = False
  541.  
  542. def apply_sort(self, sort_method: str) -> None:
  543. """Apply the selected sort method to the data, avoiding redundant sorts."""
  544. # Skip if no data to sort
  545. if not self.largest_files and not self.largest_dirs:
  546. logger.debug("No data to sort")
  547. return
  548.  
  549. # Skip if sort method hasn't changed and data is already sorted
  550. if sort_method == self._current_sort_method and self._files_sorted and self._dirs_sorted:
  551. logger.debug("Sort method unchanged and data already sorted")
  552. return
  553.  
  554. logger.info("Applying sort method: %s", sort_method)
  555.  
  556. # Define sort keys based on method
  557. sort_keys = {
  558. "sort-size": lambda x: -x.size, # Negative for descending order
  559. "sort-name": lambda x: x.path.name.lower(),
  560. "sort-path": lambda x: str(x.path).lower(),
  561. }
  562.  
  563. # Get the appropriate sort key function
  564. key_func = sort_keys.get(sort_method)
  565. if not key_func:
  566. logger.warning("Invalid sort method: %s", sort_method)
  567. return # Invalid sort method
  568.  
  569. # Only sort if we have data and sort method has changed
  570. if self.largest_files:
  571. logger.debug("Sorting %d files", len(self.largest_files))
  572. self.largest_files.sort(key=key_func)
  573. self._files_sorted = True
  574.  
  575. if self.largest_dirs:
  576. logger.debug("Sorting %d directories", len(self.largest_dirs))
  577. self.largest_dirs.sort(key=key_func)
  578. self._dirs_sorted = True
  579.  
  580. # Update current sort method
  581. self._current_sort_method = sort_method
  582.  
  583. def action_focus_files(self) -> None:
  584. """Focus the files table."""
  585. logger.debug("Focusing files table")
  586. self.current_focus = "files"
  587. self.focus_active_table()
  588.  
  589. def action_focus_dirs(self) -> None:
  590. """Focus the directories table."""
  591. logger.debug("Focusing directories table")
  592. self.current_focus = "dirs"
  593. self.focus_active_table()
  594.  
  595. def action_toggle_focus(self) -> None:
  596. """Toggle focus between files and directories tables."""
  597. self.current_focus = "dirs" if self.current_focus == "files" else "files"
  598. logger.debug("Toggled focus to %s table", self.current_focus)
  599. self.focus_active_table()
  600.  
  601. def action_sort(self) -> None:
  602. """Show the sort options dialog."""
  603. logger.debug("Opening sort options dialog")
  604.  
  605. def handle_sort_result(sort_option: Optional[str]) -> None:
  606. if sort_option:
  607. logger.info("Sort option selected: %s", sort_option)
  608. self.sort_method = sort_option
  609. self.apply_sort(sort_option)
  610. self.update_tables()
  611. self.focus_active_table()
  612.  
  613. self.push_screen(SortOptions(), handle_sort_result)
  614.  
  615. def action_refresh(self) -> None:
  616. """Refresh the directory scan."""
  617. logger.info("Refreshing directory scan")
  618. self.scan_directory()
  619.  
  620. def action_delete_selected(self) -> None:
  621. """Delete the selected file or directory."""
  622. # Get the current table based on the focus
  623. table = self.query_one("#files-table" if self.current_focus == "files" else "#dirs-table")
  624.  
  625. # Check if a row is selected
  626. if table.cursor_coordinate is not None:
  627. row = table.cursor_coordinate.row
  628. if row < len(table.rows):
  629. # Get the path from the row key
  630. # Get row data (unused but kept for potential future use)
  631. table.get_row_at(row)
  632.  
  633. # In the current version of Textual, we need to access the key differently
  634. # The key is stored when we add the row, so we need to look it up in our data
  635. if self.current_focus == "files" and row < len(self.largest_files):
  636. path = self.largest_files[row].path
  637. elif self.current_focus == "dirs" and row < len(self.largest_dirs):
  638. path = self.largest_dirs[row].path
  639. else:
  640. logger.warning("Could not determine path for selected item")
  641. self.notify("Could not determine the path for this item", timeout=5)
  642. return
  643.  
  644. is_dir = path.is_dir()
  645. logger.info("Requesting deletion confirmation for %s: %s",
  646. "directory" if is_dir else "file", path)
  647.  
  648. # Show confirmation dialog
  649. def handle_confirmation(confirmed: bool) -> None:
  650. if confirmed:
  651. try:
  652. if is_dir:
  653. logger.info("Deleting directory: %s", path)
  654. shutil.rmtree(path)
  655. else:
  656. logger.info("Deleting file: %s", path)
  657. os.remove(path)
  658. self.notify(f"Successfully deleted {path}", timeout=5)
  659. except Exception as e:
  660. logger.exception("Error deleting %s: %s", path, e)
  661. self.notify(f"Error deleting {path}: {e}", timeout=5)
  662.  
  663. self.push_screen(ConfirmationDialog(path, is_dir), handle_confirmation)
  664.  
  665. def action_help(self) -> None:
  666. """Show help information."""
  667. logger.debug("Displaying help information")
  668. help_text = """
  669. [#93a1a1]Reclaimed Help[/]
  670. [#268bd2]Navigation:[/]
  671. - Arrow keys: Navigate within a table
  672. - F: Focus Files table
  673. - D: Focus Directories table
  674. - Tab: Move between tables
  675. [#268bd2]Actions:[/]
  676. - Delete: Delete selected item
  677. - S: Sort items
  678. - R: Refresh scan
  679. - Q: Quit application
  680. [#268bd2]Selection:[/]
  681. - Click on a row to select it
  682. - Press Delete to remove the selected item
  683. """
  684. self.notify(help_text, timeout=10)
  685.  
  686. # Tab button handlers removed as we now have a unified view
  687. def on_data_table_row_selected(self, event) -> None:
  688. """Handle row selection in data tables."""
  689. table_id = event.data_table.id
  690. row = event.cursor_coordinate.row
  691.  
  692. # Update current_focus based on which table was selected
  693. if table_id == "files-table":
  694. items = self.largest_files
  695. self.current_focus = "files"
  696. else:
  697. items = self.largest_dirs
  698. self.current_focus = "dirs"
  699.  
  700. if 0 <= row < len(items):
  701. path = items[row].path
  702. logger.debug("Selected item: %s", path)
  703. self.notify(f"Selected: {path}", timeout=3)
  704.  
  705. def check_header_visibility(self) -> None:
  706. """Check header visibility after a delay."""
  707. try:
  708. # Only log if in debug mode to reduce noise
  709. if self.debug:
  710. # Debug header visibility
  711. dirs_header = self.query_one("#dirs-section-header")
  712. files_header = self.query_one("#files-section-header")
  713. logger.debug("dirs_header visible: %s", dirs_header.styles.display)
  714. logger.debug("files_header visible: %s", files_header.styles.display)
  715. logger.debug("dirs_header text: %s", dirs_header.render())
  716. logger.debug("files_header text: %s", files_header.render())
  717.  
  718. # Check the DOM order
  719. all_widgets = list(self.query("Static"))
  720. logger.debug("Widget order in DOM:")
  721. for i, widget in enumerate(all_widgets):
  722. logger.debug("%d: %s - %s", i, widget.id, widget.render())
  723. except Exception as e:
  724. logger.exception("Error checking headers: %s", e)
  725.  
  726. def focus_active_table(self) -> None:
  727. """Focus the currently active table based on current_focus."""
  728. table_id = "#files-table" if self.current_focus == "files" else "#dirs-table"
  729. table = self.query_one(table_id)
  730.  
  731. # Only set focus if the table has rows
  732. if len(table.rows) > 0:
  733. logger.debug("Focusing table: %s", table_id)
  734. self.set_focus(table)
  735.  
  736. # Set cursor to first row if no row is selected
  737. if table.cursor_coordinate is None:
  738. table.move_cursor(row=0, column=0)
  739.  
  740. def on_unmount(self) -> None:
  741. """Event handler called when app is unmounted."""
  742. logger.info("App unmounting")
  743. if self.on_exit_callback:
  744. self.on_exit_callback()
  745.  
  746.  
  747. def configure_logging(debug_mode: bool = False):
  748. """Configure the logging system with appropriate levels and handlers.
  749.  
  750. Args:
  751. debug_mode: Enable debug logging if True
  752. """
  753. # Set the log level based on debug mode
  754. log_level = logging.DEBUG if debug_mode else logging.INFO
  755.  
  756. # Configure the root logger to handle all messages
  757. root_logger = logging.getLogger()
  758. root_logger.setLevel(log_level)
  759.  
  760. # Clear any existing handlers to avoid duplicates
  761. root_logger.handlers = []
  762.  
  763. # Create a handler for the console output with rich formatting
  764. console_handler = RichHandler(
  765. rich_tracebacks=True,
  766. omit_repeated_times=False,
  767. tracebacks_show_locals=debug_mode,
  768. )
  769. console_handler.setLevel(log_level)
  770.  
  771. # Create a formatter for the console output
  772. if debug_mode:
  773. # More detailed format for debug mode
  774. formatter = logging.Formatter(
  775. "%(asctime)s | %(name)s | %(levelname)s | %(filename)s:%(lineno)d | %(message)s"
  776. )
  777. else:
  778. # Simpler format for normal mode
  779. formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
  780.  
  781. console_handler.setFormatter(formatter)
  782.  
  783. # Add the handler to the logger
  784. root_logger.addHandler(console_handler)
  785.  
  786. # If debug mode is enabled, also log to a file for more detailed analysis
  787. if debug_mode:
  788. # Create the logs directory if it doesn't exist
  789. log_dir = Path("logs")
  790. log_dir.mkdir(exist_ok=True)
  791.  
  792. # Create a timestamped log file
  793. log_file = log_dir / f"reclaimed_{time.strftime('%Y%m%d_%H%M%S')}.log"
  794. file_handler = logging.FileHandler(log_
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement