Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # --- DnsCryptTray - a quick and dirty python script to help easily? whitelist blocked domains
- # --- Desoigned and half-assed by Adrian Miller
- import sys
- import os
- import re
- import threading # Still imported, but watcher logic moved to QThread
- import time
- import json
- import subprocess
- import winreg # Used for startup registry key
- import tldextract
- from PyQt5.QtWidgets import (
- QApplication, QWidget, QSystemTrayIcon, QMenu, QAction,
- QTableWidget, QTableWidgetItem, QVBoxLayout, QHBoxLayout, QGridLayout, QPushButton, QCheckBox,
- QDialog, QMessageBox, QHeaderView, QAbstractItemView, QSizePolicy, QSpinBox, QLabel, QComboBox,
- QDialogButtonBox # Import QDialogButtonBox for custom buttons in QMessageBox
- )
- from PyQt5.QtGui import QIcon, QPixmap
- from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject, QThread # Import QThread
- # --- Configuration and Constants ---
- # Define BASE_DIR to handle paths correctly when running from source or compiled executable
- if getattr(sys, 'frozen', False):
- # Running as bundled executable
- BASE_DIR = os.path.dirname(sys.executable)
- else:
- # Running as a script
- BASE_DIR = os.path.dirname(os.path.abspath(__file__))
- TEXT_FILE_PATH = os.path.join(BASE_DIR, "blocked-names.log")
- OUTPUT_FILE_PATH = os.path.join(BASE_DIR, "allowed-names.txt")
- CONFIG_FILE_PATH = os.path.join(BASE_DIR, "config.json")
- HELPER_SCRIPT_PATH = os.path.join(BASE_DIR, "restart_service_helper.py") # Assuming helper is in base dir
- APP_NAME = "DnsCryptTray"
- DEFAULT_TIMEOUT_SECONDS = 10 # Default for the new message box
- DEFAULT_PROMPT_ACTION = "Ignore" # New default action (must match button text)
- # --- Configuration Loading/Saving ---
- def load_config():
- """Loads configuration from JSON file."""
- print(f"Attempting to load config from: {CONFIG_FILE_PATH}") # Debug print
- config = {}
- if os.path.exists(CONFIG_FILE_PATH):
- try:
- with open(CONFIG_FILE_PATH, 'r', encoding='utf-8') as f:
- config = json.load(f)
- print("Config loaded successfully.") # Debug print
- except json.JSONDecodeError:
- print(f"Warning: Error decoding JSON from {CONFIG_FILE_PATH}. Using defaults.")
- except Exception as e:
- print(f"Warning: Could not load config {CONFIG_FILE_PATH}: {e}. Using defaults.")
- else:
- print(f"Config file not found: {CONFIG_FILE_PATH}. Using defaults.") # Debug print
- # Ensure default values are present after loading
- if "messagebox_timeout_seconds" not in config:
- config["messagebox_timeout_seconds"] = DEFAULT_TIMEOUT_SECONDS
- if "default_prompt_action" not in config:
- config["default_prompt_action"] = DEFAULT_PROMPT_ACTION
- if "notifications_enabled" not in config:
- config["notifications_enabled"] = True
- if "run_on_startup" not in config:
- config["run_on_startup"] = False
- if "restart_on_clear" not in config:
- config["restart_on_clear"] = False
- return config
- def save_config(config):
- """Saves configuration to JSON file."""
- print(f"Attempting to save config to: {CONFIG_FILE_PATH}") # Debug print
- try:
- # Ensure the directory exists before saving
- config_dir = os.path.dirname(CONFIG_FILE_PATH)
- if config_dir and not os.path.exists(config_dir):
- os.makedirs(config_dir)
- print(f"Created config directory: {config_dir}") # Debug print
- with open(CONFIG_FILE_PATH, 'w', encoding='utf-8') as f:
- json.dump(config, f, indent=4) # Indent for readability
- print("Config saved successfully.") # Debug print
- except Exception as e:
- print(f"Error saving config {CONFIG_FILE_PATH}: {e}")
- # --- Config Manager Class ---
- class ConfigManager:
- """Manages application configuration."""
- _config = None
- @classmethod
- def load(cls):
- """Loads configuration."""
- cls._config = load_config()
- print("ConfigManager loaded config.") # Debug print
- @classmethod
- def get(cls, key, default=None):
- """Gets a configuration value, loads config if necessary."""
- if cls._config is None:
- cls.load() # Load existing config if not already loaded
- # Default values are ensured by load_config, so direct get is fine here
- return cls._config.get(key, default)
- @classmethod
- def set(cls, key, value):
- """Sets a configuration value and saves the config."""
- if cls._config is None:
- cls.load() # Load existing config before modifying
- cls._config[key] = value
- print(f"ConfigManager: Setting key '{key}' to '{value}' in memory.") # Debug print
- save_config(cls._config)
- print("ConfigManager: Config saved after set.") # Debug print
- # --- Whitelist Utility Function ---
- def add_domain_to_whitelist(domain_to_add):
- """
- Reads the existing whitelist, adds a new domain if not present,
- sorts the list, and writes it back to the file. Returns True on success.
- Handles case-insensitivity and basic sanitization.
- """
- print(f"Attempting to add '{domain_to_add}' to whitelist...") # Debug print
- if not domain_to_add:
- print("Warning: Attempted to add an empty domain to whitelist.") # Debug print
- return False
- # Basic sanitization - remove leading/trailing dots, lowercase for storage/comparison
- domain_to_add_lower = domain_to_add.strip().strip('.').lower()
- if not domain_to_add_lower: # Check again after stripping
- print("Warning: Domain is empty after sanitization.") # Debug print
- return False
- existing = set()
- try:
- if os.path.exists(OUTPUT_FILE_PATH):
- # Ensure file is read with UTF-8 encoding
- print(f"Reading existing whitelist from '{OUTPUT_FILE_PATH}'.") # Debug print
- with open(OUTPUT_FILE_PATH, 'r', encoding='utf-8') as f:
- # Store existing domains in lowercase for case-insensitive check
- existing.update(line.strip().lower() for line in f if line.strip())
- print(f"Read {len(existing)} existing domains.") # Debug print
- except Exception as e:
- print(f"Error reading whitelist file '{OUTPUT_FILE_PATH}': {e}") # Debug print
- # Optionally show a more user-facing error using QMessageBox if needed in GUI context
- return False
- if domain_to_add_lower in existing:
- print(f"Domain '{domain_to_add}' (as '{domain_to_add_lower}') is already in the whitelist.") # Debug print
- return True # Already exists, considered success
- print(f"Domain '{domain_to_add_lower}' is new. Adding to set.") # Debug print
- existing.add(domain_to_add_lower)
- # Sort case-insensitively (already lowercase in set)
- all_domains = sorted(list(existing))
- try:
- # Ensure the directory exists before saving
- output_dir = os.path.dirname(OUTPUT_FILE_PATH)
- if output_dir and not os.path.exists(output_dir):
- os.makedirs(output_dir)
- print(f"Created output directory: {output_dir}") # Debug print
- # Ensure file is written with UTF-8 encoding
- print(f"Writing updated whitelist to '{OUTPUT_FILE_PATH}'. Total {len(all_domains)} domains.") # Debug print
- with open(OUTPUT_FILE_PATH, 'w', encoding='utf-8') as f:
- for domain in all_domains:
- f.write(domain + '\n')
- print(f"Successfully added '{domain_to_add}' (as '{domain_to_add_lower}') to whitelist '{OUTPUT_FILE_PATH}'.") # Debug print
- return True
- except Exception as e:
- print(f"Error writing whitelist file '{OUTPUT_FILE_PATH}': {e}") # Debug print
- # Optionally show a more user-facing error
- return False
- # --- File Watcher Class (using QThread) ---
- class FileWatcher(QObject):
- """Watches the blocked-names.log file for new entries."""
- # Signal emitted with the domain string when a new domain is detected
- new_domain_signal = pyqtSignal(str)
- def __init__(self, filepath):
- super().__init__()
- self.filepath = filepath
- self._stop = False
- # REMOVED: self.seen_domains = set() # Seen domains are now tracked by TrayIcon
- self._last_position = 0
- self._last_inode = None # Track file identity (inode on Unix, similar concept on Windows)
- self._last_mtime = None # Track modification time
- # Create and configure the QThread
- self._thread = QThread()
- # Move the worker object (self) to the new thread
- self.moveToThread(self._thread)
- # Connect the thread's started signal to the worker's main task method
- self._thread.started.connect(self.watch_file)
- # Clean up when the thread finishes
- self._thread.finished.connect(self._thread.deleteLater)
- print("FileWatcher initialized and moved to QThread.") # Debug print
- def start_watching(self):
- """Starts the file watching QThread."""
- if not self._thread.isRunning():
- print("FileWatcher QThread starting...") # Debug print
- self._stop = False # Ensure stop flag is false before starting
- self._thread.start()
- else:
- print("FileWatcher QThread is already running.") # Debug print
- def stop_watching(self):
- """Signals the watcher thread to stop and waits for it."""
- if self._thread.isRunning():
- print("Requesting FileWatcher QThread stop.") # Debug print
- self._stop = True # Signal the loop to stop
- # Wait for the thread's event loop (which is just sleep in this case) to finish
- self._thread.quit()
- # Optionally wait for the thread to finish execution - can block GUI if not careful
- # self._thread.wait(2000) # Wait up to 2 seconds
- # if self._thread.isRunning():
- # print("Warning: FileWatcher thread did not quit within timeout.") # Debug print
- print("FileWatcher QThread stop requested.") # Debug print
- else:
- print("FileWatcher QThread is not running.") # Debug print
- def watch_file(self):
- """QThread target function to monitor the log file."""
- print(f"FileWatcher thread running. Monitoring: {self.filepath}") # <-- Debug print
- # Initial file check and positioning
- try:
- if not os.path.exists(self.filepath):
- print(f"Warning: Monitored file not found at startup: {self.filepath}. Waiting for it...") # <-- Debug print
- # Wait for the file to appear
- while not os.path.exists(self.filepath) and not self._stop:
- time.sleep(1) # Wait 1 second before checking again
- # QApplication.processEvents() # Process GUI events if needed, but be careful in worker thread
- if self._stop:
- print("Watcher stopping during initial wait for file.") # <-- Debug print
- return # Exit thread if stop is requested while waiting
- print(f"File found: {self.filepath}. Attempting initial access.") # <-- Debug print
- # Initial seek to the end
- with open(self.filepath, 'r', encoding='utf-8', errors='ignore') as f:
- f.seek(0, os.SEEK_END)
- self._last_position = f.tell()
- print(f"Initial seek successful. Starting position: {self._last_position}") # <-- Debug print
- # Get initial file stats
- stat_info = os.stat(self.filepath)
- self._last_inode = getattr(stat_info, 'st_ino', None) # Get inode if available (Unix-like)
- self._last_mtime = stat_info.st_mtime # Get modification time
- print(f"Initial file stats: size={stat_info.st_size}, mtime={self._last_mtime}, inode={self._last_inode}") # <-- Debug print
- except Exception as e:
- print(f"ERROR: Initial file access or stat failed for {self.filepath}: {e}. Watcher thread is stopping.") # <-- Debug print
- return # Watcher stops on initial error
- # Main monitoring loop
- print("Entering file monitoring loop...") # <-- Debug print
- while not self._stop:
- # print("Watcher loop iteration.") # <-- Optional: Add for every loop check
- try:
- # Check if file still exists and get its stats
- if not os.path.exists(self.filepath):
- # If file disappears *after* the initial check, handle it
- print(f"Warning: Monitored file {self.filepath} disappeared during watching (in loop).") # <-- Debug print
- raise FileNotFoundError(f"{self.filepath} disappeared") # Jump to FileNotFoundError handler
- stat_info = os.stat(self.filepath)
- current_size = stat_info.st_size
- current_inode = getattr(stat_info, 'st_ino', None)
- current_mtime = stat_info.st_mtime
- # Check for file replacement (inode change or mtime regression)
- file_replaced = False
- if self._last_inode is not None and current_inode is not None and current_inode != self._last_inode:
- print(f"Warning: File inode changed for {self.filepath}. File replaced? Resetting position.") # <-- Debug print
- file_replaced = True
- elif self._last_mtime is not None and current_mtime is not None and self._last_mtime is not None and current_mtime < self._last_mtime:
- # mtime went backward - likely file replacement/system clock issue
- print(f"Warning: File modification time went backward for {self.filepath}. File replaced? Resetting position.") # <-- Debug print
- file_replaced = True
- if file_replaced:
- self._last_position = 0
- # REMOVED: self.seen_domains.clear() # Seen domains are cleared by TrayIcon when the file is cleared
- self._last_inode = current_inode # Update stats
- self._last_mtime = current_mtime
- # Continue the loop, the size check below will handle reading from 0 if size > 0
- print(f"Stats after file replacement detection: size={current_size}, mtime={self._last_mtime}, inode={self._last_inode}") # <-- Debug print
- # Check for truncation (size decreased) - handles cases like file clear
- if current_size < self._last_position:
- print(f"Log file truncation detected. Size {current_size} < LastPos {self._last_position}. Resetting position.") # <-- Debug print
- self._last_position = 0
- # REMOVED: self.seen_domains.clear() # Seen domains are cleared by TrayIcon when the file is cleared
- # Update stats as file has effectively become a new file at the same path
- try:
- # Ensure file exists before getting stats after truncation
- if os.path.exists(self.filepath):
- stat_info_after_truncate_check = os.stat(self.filepath) # Get fresh stats
- self._last_inode = getattr(stat_info_after_truncate_check, 'st_ino', None)
- self._last_mtime = stat_info_after_truncate_check.st_mtime
- print(f"Stats after truncation reset: size={stat_info_after_truncate_check.st_size}, mtime={self._last_mtime}, inode={self._last_inode}") # <-- Debug print
- else:
- print(f"File {self.filepath} disappeared right after truncation detection. Resetting stats.") # Debug print
- self._last_inode = None
- self._last_mtime = None
- except Exception as e:
- print(f"Warning: Could not get stats after truncation processing for {self.filepath}: {e}") # <-- Debug print
- self._last_inode = None # Reset if stat fails
- self._last_mtime = None
- # Check for new data (size increased)
- if current_size > self._last_position:
- print(f"New data detected. Current size: {current_size}, Last position: {self._last_position}") # <-- Debug print
- try:
- with open(self.filepath, 'r', encoding='utf-8', errors='ignore') as f:
- f.seek(self._last_position)
- new_lines = f.readlines()
- new_position = f.tell() # Get the position after reading
- print(f"Read {len(new_lines)} new line(s). Updating position from {self._last_position} to {new_position}.") # <-- Debug print
- self._last_position = new_position # Update position for the next read
- for line in new_lines:
- print(f"Processing line: {line.strip()}") # <-- Uncommented debug print
- if not line.strip():
- continue
- domains_in_line = self.parse_line(line)
- print(f"Parsed domains from line: {domains_in_line}") # <-- Uncommented debug print
- for domain in domains_in_line:
- # The watcher now emits the signal for every domain found in new lines.
- # Uniqueness check for notifications is done in TrayIcon.handle_new_domain.
- if domain.strip(): # Ensure domain is not empty after stripping
- print(f"Watcher found domain: {domain}. Emitting signal.") # <-- Debug print
- # Emit the original case domain string via the signal
- self.new_domain_signal.emit(domain.strip()) # Emit stripped original case
- except Exception as e:
- print(f"ERROR reading new data from {self.filepath}: {e}") # <-- Debug print
- # If read fails, might need to reset position or wait longer
- # For now, just print error and continue loop
- # Update last known stats if no errors occurred and file exists
- # (This happens implicitly if no exception is raised in the main try block)
- # We already updated stats for replacement/truncation, but update here for normal growth
- if os.path.exists(self.filepath): # Double check exists after potential read errors
- try:
- stat_info_after_process = os.stat(self.filepath)
- self._last_inode = getattr(stat_info_after_process, 'st_ino', None)
- self._last_mtime = stat_info_after_process.st_mtime
- except Exception as e:
- print(f"Warning: Could not get stats after processing lines for {self.filepath}: {e}") # <-- Debug print
- self._last_inode = None # Reset if stat fails
- self._last_mtime = None
- # Prevent busy-waiting
- time.sleep(0.5) # Keep a small sleep interval
- except FileNotFoundError:
- print(f"Warning: Monitored file {self.filepath} is not found (in loop). Resetting state. Waiting 5s...") # <-- Debug print
- self._last_position = 0 # Reset position as file is gone
- # REMOVED: self.seen_domains.clear() # Seen domains handled by TrayIcon
- self._last_inode = None # Reset stats
- self._last_mtime = None
- time.sleep(5)
- # After waiting, check if it exists again. If it reappeared, the next loop iteration
- # will find it and read from the beginning (position 0) due to last_position being 0.
- if os.path.exists(self.filepath):
- print(f"File {self.filepath} reappeared after wait.") # <-- Debug print
- # Attempt to get initial stats for the reappeared file immediately
- try:
- stat_info_reappeared = os.stat(self.filepath)
- self._last_inode = getattr(stat_info_reappeared, 'st_ino', None)
- self._last_mtime = stat_info_reappeared.st_mtime
- print(f"Got stats for reappeared file: size={stat_info_reappeared.st_size}, mtime={self._last_mtime}, inode={self._last_inode}") # <-- Debug print
- except Exception as e:
- print(f"Error getting stats for reappeared file {self.filepath}: {e}") # <-- Debug print
- self._last_inode = None # Stats failed again
- self._last_mtime = None
- else:
- print(f"File {self.filepath} still missing after wait.") # <-- Debug print
- except Exception as e:
- print(f"FATAL ERROR: Unhandled exception during file watching loop for {self.filepath}: {e}") # <-- Debug print
- # This is a more serious error, maybe wait longer or consider stopping?
- # For now, wait and continue the loop.
- time.sleep(5) # Wait longer after an error
- print(f"FileWatcher thread stopping gracefully for: {self.filepath}") # <-- Debug print
- def parse_line(self, line):
- """Parses a log line to extract domain names."""
- # print(f"Parsing line: {line.strip()}") # Optional: Debug print for parser input
- domains = set()
- # Regex to find domains inside "(alias for [domain.name])"
- alias_matches = re.findall(r'\s\(alias for\s+\[([^\]]+)\]\)', line)
- if alias_matches:
- # Handle multiple aliases if syntax allows, though usually one
- for alias in alias_matches:
- domains.add(alias.strip()) # Keep original case from log for signal
- else:
- # Assume tab-separated: timestamp, client_ip, domain, filter_list_name, ...
- parts = line.strip().split('\t')
- if len(parts) >= 3:
- # Domain is usually the 3rd part (index 2)
- potential_domain = parts[2].strip()
- # Basic validation: check for at least one dot and no spaces
- if '.' in potential_domain and ' ' not in potential_domain:
- domains.add(potential_domain) # Keep original case from log for signal
- # print(f"Parsed domains: {domains}") # Optional: Debug print for parser output
- return domains
- def filter_domain(self, domain):
- """Optional filtering logic for domains."""
- # Optional filtering logic (e.g., ignore certain patterns)
- # For now, accept all valid-looking domains found
- # Example: pattern = ConfigManager.get("domain_filter_regex")
- # if pattern: try: return re.fullmatch(pattern, domain) is not None except re.error: pass
- return True # Accept all parsed domains by default
- # --- Main Dialog Class ---
- class DomainListDialog(QDialog):
- """Dialog to display blocked domains and manage whitelisting."""
- def __init__(self, parent=None):
- # FIX: QDialog parent must be a QWidget or None. TrayIcon is not a QWidget.
- super().__init__(parent) # <-- Parent will now be None when created from TrayIcon
- self.setWindowTitle("DNSCrypt-Proxy Blocked Domains")
- # REVERTING LAYOUT: Revert initial size to 700, 550
- self.resize(700, 550) # <--- Reverted initial size
- # REVERTING LAYOUT: Create main layout with default stretch
- self.layout = QVBoxLayout(self) # <--- Reverted layout creation
- self.table_widget = QTableWidget(self)
- # Hide the default header labels, we use tooltips and imply meaning by column content
- self.table_widget.verticalHeader().setVisible(False)
- self.table_widget.horizontalHeader().setVisible(False)
- # Updated column count and headers for two checkboxes and two domain columns
- self.table_widget.setColumnCount(4)
- # self.table_widget.setHorizontalHeaderLabels(["Whitelist Full", "Full Domain", "Whitelist Base", "Base Domain"]) # Headers now hidden
- # Adjust resize modes for the new columns
- self.table_widget.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents) # Whitelist Full Checkbox
- self.table_widget.horizontalHeader().setSectionResizeMode(1, QHeaderView.Stretch) # Stretch Full Domain
- self.table_widget.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents) # Whitelist Base Checkbox
- self.table_widget.horizontalHeader().setSectionResizeMode(3, QHeaderView.Stretch) # Stretch Base Domain (adjust stretch if needed)
- # Set default row height to potentially fix checkbox rendering and give space
- self.table_widget.verticalHeader().setDefaultSectionSize(32) # Increased row height
- self.table_widget.setSortingEnabled(True) # Allow sorting
- self.table_widget.setSelectionBehavior(QAbstractItemView.SelectRows) # Select full rows
- self.table_widget.setSelectionMode(QAbstractItemView.NoSelection) # Disable selection to avoid conflict with checkboxes
- # REVERTING LAYOUT: Remove setMinimumHeight for the table
- # self.table_widget.setMinimumHeight(300) # <--- REMOVED this line
- # REVERTING LAYOUT: Add the table widget with default stretch (0)
- self.layout.addWidget(self.table_widget) # <--- Reverted line, removed explicit stretch
- # --- Settings Section ---
- settings_layout = QVBoxLayout()
- settings_layout.setContentsMargins(0, 0, 0, 0) # Remove margins
- self.checkbox_notify = QCheckBox("Enable domain blocking notifications/prompts", self)
- self.checkbox_notify.setChecked(ConfigManager.get("notifications_enabled", True))
- settings_layout.addWidget(self.checkbox_notify)
- # Add spin box for timeout setting
- self.timeout_spinbox = QSpinBox(self)
- self.timeout_spinbox.setRange(5, 60) # Allow timeout between 5 and 60 seconds
- self.timeout_spinbox.setValue(ConfigManager.get("messagebox_timeout_seconds", DEFAULT_TIMEOUT_SECONDS))
- self.timeout_spinbox.setSuffix(" seconds")
- self.timeout_label = QLabel("Prompt timeout:", self)
- self.timeout_label.setTextFormat(Qt.RichText) # <-- Fix: Enable rich text for label
- timeout_layout = QHBoxLayout()
- timeout_layout.addWidget(self.timeout_label)
- timeout_layout.addWidget(self.timeout_spinbox)
- timeout_layout.addStretch() # Push widgets to the left
- settings_layout.addLayout(timeout_layout) # Add timeout setting layout
- # Add dropdown for default action setting
- self.default_action_combo = QComboBox(self)
- # Ensure these match the strings used in show_domain_notification and ConfigManager
- self.default_action_combo.addItems(["Ignore", "Whitelist Full", "Whitelist Base"])
- # Set the current index based on the saved config value
- default_action = ConfigManager.get("default_prompt_action", DEFAULT_PROMPT_ACTION)
- index = self.default_action_combo.findText(default_action, Qt.MatchExactly)
- if index != -1:
- self.default_action_combo.setCurrentIndex(index)
- else:
- # If saved value is invalid, reset to default and save
- print(f"Warning: Invalid saved default prompt action '{default_action}'. Resetting to default.") # Debug print
- self.default_action_combo.setCurrentText(DEFAULT_PROMPT_ACTION)
- ConfigManager.set("default_prompt_action", DEFAULT_PROMPT_ACTION) # Save the corrected value
- self.default_action_label = QLabel("Default prompt action on timeout:", self)
- self.default_action_label.setTextFormat(Qt.RichText) # <-- Fix: Enable rich text for label
- default_action_layout = QHBoxLayout()
- default_action_layout.addWidget(self.default_action_label)
- default_action_layout.addWidget(self.default_action_combo)
- default_action_layout.addStretch() # Push widgets to the left
- settings_layout.addLayout(default_action_layout) # Add default action setting layout
- self.checkbox_startup = QCheckBox("Run this application on system startup", self)
- self.checkbox_startup.setChecked(ConfigManager.get("run_on_startup", False))
- settings_layout.addWidget(self.checkbox_startup)
- self.checkbox_restart_on_clear = QCheckBox("Auto-restart dnscrypt-proxy after clearing log", self)
- self.checkbox_restart_on_clear.setChecked(ConfigManager.get("restart_on_clear", False))
- settings_layout.addWidget(self.checkbox_restart_on_clear)
- # REVERTING LAYOUT: Add the settings layout with default stretch (0)
- self.layout.addLayout(settings_layout) # <--- Reverted line, removed explicit stretch
- # --- Button Section ---
- button_layout = QGridLayout()
- button_layout.setContentsMargins(0, 10, 0, 0) # Add some top margin
- self.button_save = QPushButton("Whitelist selected domain(s)", self)
- self.button_clear_input = QPushButton("Clear blocked-names.log", self)
- self.button_restart_service = QPushButton("Restart dnscrypt-proxy Service", self)
- self.button_refresh_list = QPushButton("Refresh List from Log File", self)
- self.button_close = QPushButton("Close Window", self)
- button_layout.addWidget(self.button_save, 0, 0)
- button_layout.addWidget(self.button_clear_input, 0, 1)
- button_layout.addWidget(self.button_restart_service, 1, 0)
- button_layout.addWidget(self.button_refresh_list, 1, 1)
- button_layout.addWidget(self.button_close, 2, 0, 1, 2) # Span close across bottom
- # REVERTING LAYOUT: Add the button layout with default stretch (0)
- self.layout.addLayout(button_layout) # <--- Reverted line, removed explicit stretch
- # REVERTING LAYOUT: Remove the final addStretch. This will remove the explicit void at the bottom.
- # self.layout.addStretch(1) # <--- REMOVED THIS LINE (was not in original problematic code)
- # Internal state
- # Store domains in lowercase in the set for case-insensitive tracking
- # This set now tracks what's currently displayed in the table.
- self.domains_set = set() # Tracks domains currently *displayed* in the table (lowercase)
- self.populate_list() # Initial population
- # --- Connections ---
- self.button_save.clicked.connect(self.save_selected_domains_from_table)
- self.button_clear_input.clicked.connect(self.clear_input_file)
- self.button_restart_service.clicked.connect(self.restart_dnscrypt_proxy)
- self.button_refresh_list.clicked.connect(self.populate_list)
- self.button_close.clicked.connect(self.hide) # Hide instead of close
- self.checkbox_notify.stateChanged.connect(self.save_settings)
- self.timeout_spinbox.valueChanged.connect(self.save_timeout_setting) # Connect spinbox value changed
- self.default_action_combo.currentIndexChanged.connect(self.save_default_action_setting) # Connect combobox index changed
- self.checkbox_startup.stateChanged.connect(self.toggle_startup)
- self.checkbox_restart_on_clear.stateChanged.connect(self.save_settings)
- def closeEvent(self, event):
- """Override close event to hide instead of exit."""
- print("DomainListDialog close event received. Hiding window.") # Debug print
- event.ignore() # Ignore the close event
- self.hide() # Hide the window instead
- def populate_list(self):
- """Reads the log file (last N lines) and populates the table."""
- print(f"Populating list from {TEXT_FILE_PATH}...") # Debug print
- if not os.path.exists(TEXT_FILE_PATH):
- QMessageBox.warning(self, "File Not Found", f"{os.path.basename(TEXT_FILE_PATH)} not found.")
- print(f"Log file not found: {TEXT_FILE_PATH}. Clearing table.") # Debug print
- self.table_widget.setRowCount(0) # Clear table if file is gone
- self.domains_set.clear() # Clear dialog's internal set
- return
- # Read last ~500 lines for context (increased from 200 for more history)
- lines = []
- try:
- print("Reading lines from log file...") # Debug print
- with open(TEXT_FILE_PATH, 'r', encoding='utf-8', errors='ignore') as file:
- # Read all lines efficiently, then take the tail
- all_lines = file.readlines()
- lines = all_lines[-500:] # Adjust number of lines as needed
- print(f"Finished reading {len(all_lines)} total lines from log, processing last {len(lines)}.") # Debug print
- except Exception as e:
- QMessageBox.critical(self, "Error Reading Log", f"Could not read {os.path.basename(TEXT_FILE_PATH)}:\n{e}")
- print(f"Error reading log file {TEXT_FILE_PATH}: {e}. Clearing table.") # Debug print
- self.table_widget.setRowCount(0) # Clear table on read error
- self.domains_set.clear() # Clear dialog's internal set
- return
- # Use FileWatcher's parser logic
- # Create a temporary watcher instance just to use its parsing method
- # Pass the path, but its internal state won't affect parse_line
- temp_watcher = FileWatcher(TEXT_FILE_PATH)
- current_domains_in_log = set() # Use a set to store unique domains (lowercase) found in recent log entries
- domain_display_map = {} # Map lowercase to first encountered original case for display
- for line in lines:
- parsed_domains = temp_watcher.parse_line(line)
- for domain in parsed_domains:
- domain_lower = domain.strip().lower()
- if domain_lower and domain_lower not in current_domains_in_log:
- current_domains_in_log.add(domain_lower)
- # Store the first encountered case for display
- domain_display_map[domain_lower] = domain.strip()
- print(f"Parsed {len(current_domains_in_log)} unique domains (lowercase) from recent log entries.") # Debug print
- # Update table - Clear and repopulate
- self.table_widget.setSortingEnabled(False) # Disable sorting during update
- self.table_widget.setRowCount(0) # Clear table
- self.domains_set.clear() # Clear dialog's internal tracking set (lowercase)
- # Sort unique lowercase domains for consistent table order
- sorted_domains_lower = sorted(list(current_domains_in_log))
- # Add rows for each unique domain
- for domain_lower in sorted_domains_lower:
- # Use the stored original/first encountered case for adding to the table
- display_domain = domain_display_map.get(domain_lower, domain_lower) # Fallback to lowercase if map somehow failed
- self.add_domain_row(display_domain) # add_domain_row handles uniqueness within the dialog's set
- print(f"Table populated with {self.table_widget.rowCount()} rows.") # Debug print
- self.table_widget.setSortingEnabled(True) # Re-enable sorting
- def add_domain_row(self, full_domain):
- """Adds a domain to the table with exclusive checkboxes for full and base domain whitelisting."""
- # Use lowercase for internal set check
- domain_lower = full_domain.strip().strip('.').lower() # Ensure consistency
- # Check against the dialog's internal set (self.domains_set)
- if not domain_lower or domain_lower in self.domains_set:
- # print(f"Skipping adding domain row: '{full_domain}' (empty or duplicate in set).") # Optional debug print
- return # Ignore empty or duplicate domains (case-insensitive check)
- self.domains_set.add(domain_lower) # Add lowercase to the dialog's set
- # Attempt to extract base domain using tldextract
- base_domain = full_domain # Default to full domain if extraction fails or result isn't valid
- try:
- base = tldextract.extract(full_domain)
- # Ensure both domain and suffix exist for a valid base domain
- # Use the result from tldextract if valid, otherwise fallback to full_domain
- if base.domain and base.suffix:
- base_domain = f"{base.domain}.{base.suffix}"
- else:
- # If tldextract didn't find domain/suffix, check if the full domain itself is a potential base
- # (e.g., "localhost", although typically not blocked like this)
- # For now, just use full domain if tldextract failed to find parts
- pass # base_domain is already set to full_domain
- except Exception as e:
- print(f"Warning: tldextract failed for '{full_domain}': {e}. Using full domain as base.") # Debug print
- base_domain = full_domain # Fallback to full domain
- row = self.table_widget.rowCount()
- self.table_widget.insertRow(row)
- # Column 0: Whitelist Full Domain (Checkbox)
- checkbox_full = QCheckBox()
- # Center the checkbox and remove spacing
- checkbox_full_layout = QHBoxLayout()
- checkbox_full_layout.setContentsMargins(0, 0, 0, 0) # Remove margins
- checkbox_full_layout.setSpacing(0) # Remove spacing
- checkbox_full_layout.addStretch()
- checkbox_full_layout.addWidget(checkbox_full)
- checkbox_full_layout.addStretch()
- checkbox_full_widget = QWidget()
- checkbox_full_widget.setLayout(checkbox_full_layout)
- self.table_widget.setCellWidget(row, 0, checkbox_full_widget)
- checkbox_full_widget.setToolTip(f"Check to whitelist the full domain: {full_domain}")
- # Column 1: Full Domain (Text)
- full_item = QTableWidgetItem(full_domain) # Display original case
- full_item.setFlags(full_item.flags() & ~Qt.ItemIsUserCheckable & ~Qt.ItemIsEditable) # Read-only
- # REMOVED: full_item.setTextFormat(Qt.RichText) # <-- This line caused the AttributeError, QTableWidgetItem doesn't have this method
- full_item.setToolTip(f"Full blocked domain: {full_domain}")
- self.table_widget.setItem(row, 1, full_item) # Set the item
- # Column 2: Whitelist Base Domain (Checkbox)
- checkbox_base = QCheckBox()
- # Center the checkbox and remove spacing
- checkbox_base_layout = QHBoxLayout()
- checkbox_base_layout.setContentsMargins(0, 0, 0, 0) # Remove margins
- checkbox_base_layout.setSpacing(0) # Remove spacing
- checkbox_base_layout.addStretch()
- checkbox_base_layout.addWidget(checkbox_base)
- checkbox_base_layout.addStretch()
- checkbox_base_widget = QWidget()
- checkbox_base_widget.setLayout(checkbox_base_layout)
- self.table_widget.setCellWidget(row, 2, checkbox_base_widget)
- checkbox_base_widget.setToolTip(f"Check to whitelist the base domain: {base_domain}")
- # Column 3: Base Domain (Text)
- base_item = QTableWidgetItem(base_domain) # Display original/tldextract case
- base_item.setFlags(base_item.flags() & ~Qt.ItemIsUserCheckable & ~Qt.ItemIsEditable) # Read-only
- # REMOVED: base_item.setTextFormat(Qt.RichText) # <-- This line caused the AttributeError, QTableWidgetItem doesn't have this method
- base_item.setToolTip(f"Base domain: {base_domain}")
- self.table_widget.setItem(row, 3, base_item) # Set the item
- # Connect signals for exclusive selection using lambda with captured variables
- checkbox_full.stateChanged.connect(lambda state, cb_base=checkbox_base: self.handle_checkbox_state_change(checkbox_full, cb_base, state))
- checkbox_base.stateChanged.connect(lambda state, cb_full=checkbox_full: self.handle_checkbox_state_change(checkbox_base, cb_full, state))
- # Set size policy for the checkbox widget to help with layout
- checkbox_full_widget.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)
- checkbox_base_widget.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)
- # Modified handle_checkbox_state_change to accept checkbox objects directly
- def handle_checkbox_state_change(self, changed_checkbox, other_checkbox, state):
- """Handles state changes for checkboxes to ensure exclusive selection per row."""
- # Only deselect the other checkbox if the current one is being checked
- if state == Qt.Checked:
- # Ensure the other checkbox exists before trying to access it
- if other_checkbox and other_checkbox.isChecked():
- other_checkbox.setChecked(Qt.Unchecked)
- def save_selected_domains_from_table(self):
- """Saves domains corresponding to checked checkboxes in the table."""
- print("Saving selected domains from table...") # Debug print
- domains_to_whitelist = set() # Use a set to store domains to whitelist (lowercase for uniqueness)
- full_domains_for_feedback = set() # Keep track of original full domains for confirmation
- for row in range(self.table_widget.rowCount()):
- # Get the checkboxes from the cell widgets
- checkbox_full_widget = self.table_widget.cellWidget(row, 0)
- checkbox_base_widget = self.table_widget.cellWidget(row, 2)
- domain_added_in_row = False # Flag to ensure only one domain is added per row from this row's checkboxes
- # Check the 'Whitelist Full' checkbox first
- if checkbox_full_widget:
- checkbox_full = checkbox_full_widget.findChild(QCheckBox)
- if checkbox_full and checkbox_full.isChecked():
- full_domain_item = self.table_widget.item(row, 1) # Full domain is in column 1
- if full_domain_item:
- domain_text = full_domain_item.text().strip()
- if domain_text:
- domains_to_whitelist.add(domain_text.lower()) # Add lowercase to set for whitelisting
- full_domains_for_feedback.add(domain_text) # Add original case for feedback/confirmation
- domain_added_in_row = True # A domain was selected in this row
- print(f"Selected full domain for whitelisting: {domain_text}") # Debug print
- # Only check the base checkbox if no domain was selected via the full checkbox in this row
- if not domain_added_in_row and checkbox_base_widget:
- checkbox_base = checkbox_base_widget.findChild(QCheckBox)
- if checkbox_base and checkbox_base.isChecked():
- base_domain_item = self.table_widget.item(row, 3) # Base domain is in column 3
- if base_domain_item:
- domain_text = base_domain_item.text().strip()
- if domain_text:
- domains_to_whitelist.add(domain_text.lower()) # Add lowercase to set for whitelisting
- # Add the corresponding full domain for feedback (get from column 1)
- full_domains_for_feedback.add(self.table_widget.item(row, 1).text().strip()) # Use original full domain
- print(f"Selected base domain for whitelisting: {domain_text}") # Debug print
- if not domains_to_whitelist:
- print("No domains selected for whitelisting.") # Debug print
- QMessageBox.information(self, "No Selection", "No domains were checked for whitelisting.")
- return
- print(f"Attempting to whitelist {len(domains_to_whitelist)} unique domains (lowercase).") # Debug print
- # Confirmation dialog
- confirm_msg = f"Add the following {len(domains_to_whitelist)} domain(s) to '{os.path.basename(OUTPUT_FILE_PATH)}'?\n\n"
- confirm_msg += "\n".join(sorted(list(domains_to_whitelist))) # Display lowercase sorted list of what will be added
- # Optionally list the original full domains that led to these selections
- # confirm_msg += "\n\n(Selected from blocked domains like: " + ", ".join(sorted(list(full_domains_for_feedback))[:5]) + "...)"
- reply = QMessageBox.question(self, 'Confirm Whitelist', confirm_msg,
- QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
- if reply == QMessageBox.Yes:
- print("User confirmed whitelisting. Adding domains...") # Debug print
- added_count = 0
- failed_count = 0
- # Iterate through the lowercase set for unique domains to add
- for domain_lower in sorted(list(domains_to_whitelist)):
- # Use the utility function for adding each domain
- # The utility function handles case-insensitivity and existence checks
- if add_domain_to_whitelist(domain_lower):
- added_count +=1
- else:
- failed_count += 1
- # Provide feedback
- feedback_msg = f"Finished adding domains:\n- Added/Already Present: {added_count}\n- Failed: {failed_count}"
- print(feedback_msg) # Debug print
- QMessageBox.information(self, "Whitelist Update Complete", feedback_msg)
- # Uncheck rows after successful save that correspond to added domains
- # Re-iterate through the table to find checked boxes for domains that were successfully processed
- domains_processed_lower = domains_to_whitelist # Assuming all selected were attempted
- for row in range(self.table_widget.rowCount()):
- checkbox_full_widget = self.table_widget.cellWidget(row, 0)
- checkbox_base_widget = self.table_widget.cellWidget(row, 2)
- if checkbox_full_widget:
- checkbox_full = checkbox_full_widget.findChild(QCheckBox)
- if checkbox_full and checkbox_full.isChecked():
- full_domain_item = self.table_widget.item(row, 1)
- if full_domain_item and full_domain_item.text().strip().lower() in domains_processed_lower:
- checkbox_full.setChecked(False) # Uncheck the box
- if checkbox_base_widget:
- checkbox_base = checkbox_base_widget.findChild(QCheckBox)
- if checkbox_base and checkbox_base.isChecked():
- base_domain_item = self.table_widget.item(row, 3)
- if base_domain_item and base_domain_item.text().strip().lower() in domains_processed_lower:
- checkbox_base.setChecked(False) # Uncheck the box
- # Ask if user wants to restart dnscrypt-proxy now
- if added_count > 0: # Only prompt restart if something was actually added/wasn't already there
- print("Domains added/present, prompting for service restart.") # Debug print
- restart_reply = QMessageBox.question(self, 'Restart Service?',
- "Domains added to whitelist.\nRestart dnscrypt-proxy service now for changes to take effect?",
- QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
- if restart_reply == QMessageBox.Yes:
- print("User chose to restart service.") # Debug print
- self.restart_dnscrypt_proxy()
- else:
- print("User chose NOT to restart service.") # Debug print
- else:
- print("No domains were actually added/new. No restart prompt.") # Debug print
- def clear_input_file(self):
- """Clears the blocked-names.log file."""
- print(f"Attempting to clear log file: {TEXT_FILE_PATH}") # Debug print
- confirm = QMessageBox.question(self, "Confirm Clear",
- f"Are you sure you want to permanently clear the log file:\n{os.path.basename(TEXT_FILE_PATH)}?",
- QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
- if confirm == QMessageBox.Yes:
- print("User confirmed clearing log file.") # Debug print
- try:
- # Stop the watcher thread gracefully before modifying the file it monitors
- # Access the file_watcher via the parent TrayIcon object
- tray_icon_parent = self.parent() # This should be the TrayIcon instance
- if isinstance(tray_icon_parent, TrayIcon) and tray_icon_parent.file_watcher:
- print("Stopping file watcher before clearing log.") # Debug print
- tray_icon_parent.file_watcher.stop_watching()
- # Give the thread a moment to ideally finish its current loop and check _stop
- # The wait() call can block the GUI, so relying on the sleep interval is safer.
- # time.sleep(0.2) # Optional small delay
- # Open in write mode and immediately close to truncate the file
- with open(TEXT_FILE_PATH, 'w', encoding='utf-8') as f:
- f.write("") # Explicitly write empty string
- print(f"{TEXT_FILE_PATH} has been cleared (truncated).") # Debug print
- self.table_widget.setRowCount(0) # Clear table UI
- self.domains_set.clear() # Clear dialog's internal set
- print("Table view and internal domain set cleared.") # Debug print
- # Also clear the TrayIcon's seen_domains set when the log is cleared,
- # so future entries will trigger notifications again.
- if isinstance(tray_icon_parent, TrayIcon):
- print("Clearing TrayIcon's seen_domains set.") # Debug print
- tray_icon_parent.seen_domains.clear()
- else:
- print("Warning: Could not access TrayIcon to clear seen_domains.") # Debug print
- QMessageBox.information(self, "Cleared", f"{os.path.basename(TEXT_FILE_PATH)} has been cleared.\nTable view reset.")
- # Restart the watcher thread after clearing so it monitors the (now empty) file
- # Access the file_watcher via the parent TrayIcon object again after clearing
- if isinstance(tray_icon_parent, TrayIcon) and tray_icon_parent.file_watcher:
- print("Restarting file watcher after clearing log.") # Debug print
- # The watcher should start again, detect the file exists (though empty),
- # and set its position to 0.
- tray_icon_parent.file_watcher.start_watching()
- else:
- print("Warning: Could not access file watcher to restart after clear.") # Debug print
- # Optionally restart service if configured
- # This call needs to happen *after* the watcher has potentially restarted,
- # and it should only happen if the setting is enabled.
- if self.checkbox_restart_on_clear.isChecked():
- print("Auto-restart on clear is enabled. Attempting service restart.") # Debug print
- # Call the restart method on self (the dialog instance) or its parent (TrayIcon)
- # Calling on self is fine as it's a dialog method
- self.restart_dnscrypt_proxy()
- else:
- print("Auto-restart on clear is disabled.") # Debug print
- except Exception as e:
- print(f"ERROR clearing file '{TEXT_FILE_PATH}': {e}") # Debug print
- QMessageBox.critical(self, "Error", f"Could not clear file '{os.path.basename(TEXT_FILE_PATH)}':\n{e}")
- else:
- print("User cancelled log file clear.") # Debug print
- def restart_dnscrypt_proxy(self):
- """Attempts to restart the dnscrypt-proxy service using helper script."""
- print(f"Attempting to restart service using helper script: {HELPER_SCRIPT_PATH}") # Debug print
- if not os.path.exists(HELPER_SCRIPT_PATH):
- print(f"ERROR: Helper script missing: {HELPER_SCRIPT_PATH}") # Debug print
- QMessageBox.critical(self, "Helper Script Missing",
- f"Cannot restart service: The required helper script '{os.path.basename(HELPER_SCRIPT_PATH)}' was not found in the application directory:\n{BASE_DIR}")
- return
- # Command to run python helper script elevated via PowerShell
- # Use the current executable path, or find pythonw if available and not frozen
- python_executable = sys.executable
- # In a frozen app, sys.executable is the exe. In a script, it's python/pythonw.
- # Stick to sys.executable for simplicity, let the helper script be executed by it.
- # Add quotes around paths if they contain spaces
- quoted_python_executable = f'"{python_executable}"' if ' ' in python_executable else python_executable
- # FIX: Ensure the conditional assignment for quoted_helper_path is correct
- # It should be 'else HELPER_SCRIPT_PATH' instead of 'else quoted_helper_path'
- # This line IS correct and should not cause a NameError if running this code.
- quoted_helper_path = f'"{HELPER_SCRIPT_PATH}"' if ' ' in HELPER_SCRIPT_PATH else HELPER_SCRIPT_PATH
- # Use -WindowStyle Hidden to try and hide the PowerShell window
- # Use -Verb runAs to request administrator privileges
- command = [
- "powershell",
- "-WindowStyle", "Hidden",
- "-Command",
- f"Start-Process {quoted_python_executable} -ArgumentList '{quoted_helper_path}' -Verb runAs"
- ]
- try:
- print(f"Issuing restart command: {' '.join(command)}") # Debug print
- # Use shell=True with caution, necessary for Start-Process in this context
- # subprocess.Popen allows the command to run in the background
- subprocess.Popen(command, shell=True)
- print("Restart command issued via Start-Process.") # Debug print
- QMessageBox.information(self, "Service Restart", "Attempting to restart dnscrypt-proxy service with administrator privileges (check for UAC prompt).\n\nNote: Success depends on the helper script and service configuration.")
- except Exception as e:
- print(f"ERROR issuing restart command: {e}") # Debug print
- QMessageBox.critical(self, "Error", f"Failed to issue restart command:\n\n{e}")
- def save_settings(self):
- """Saves general checkbox states to config file."""
- print("Saving general settings...") # Debug print
- ConfigManager.set("notifications_enabled", self.checkbox_notify.isChecked())
- ConfigManager.set("run_on_startup", self.checkbox_startup.isChecked())
- ConfigManager.set("restart_on_clear", self.checkbox_restart_on_clear.isChecked())
- print("General settings saved.") # Debug print
- def save_timeout_setting(self):
- """Saves the messagebox timeout setting to config file."""
- ConfigManager.set("messagebox_timeout_seconds", self.timeout_spinbox.value())
- print(f"Timeout setting saved: {self.timeout_spinbox.value()} seconds.") # Debug print
- def save_default_action_setting(self):
- """Saves the default prompt action setting to config file."""
- ConfigManager.set("default_prompt_action", self.default_action_combo.currentText())
- print(f"Default action setting saved: {self.default_action_combo.currentText()}.") # Debug print
- def toggle_startup(self):
- """Handles the 'Run on startup' checkbox and registry modification."""
- print(f"Toggling startup setting to: {self.checkbox_startup.isChecked()}") # Debug print
- self.save_settings() # Save the state first
- run_at_startup = self.checkbox_startup.isChecked()
- app_name_reg = APP_NAME # Use APP_NAME for the registry key name
- # Get the path to the executable or script
- # Use BASE_DIR which is already determined based on frozen status
- app_path = os.path.join(BASE_DIR, os.path.basename(sys.executable if getattr(sys, 'frozen', False) else __file__))
- try:
- # Access the Run key in the current user's registry hive
- key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
- r"Software\Microsoft\Windows\CurrentVersion\Run",
- 0, winreg.KEY_ALL_ACCESS)
- if run_at_startup:
- # Set the registry value
- # Ensure the path is quoted in case it contains spaces
- quoted_app_path = f'"{app_path}"'
- winreg.SetValueEx(key, app_name_reg, 0, winreg.REG_SZ, quoted_app_path)
- # FIX: Escape backslashes in print statement
- print(f"Set registry key HKCU\\...\\Run\\{app_name_reg} to {quoted_app_path}") # Debug print
- QMessageBox.information(self, "Startup Setting", "Application will run on system startup.")
- else:
- # Delete the registry value if it exists
- try:
- winreg.DeleteValue(key, app_name_reg)
- # FIX: Escape backslashes in print statement
- print(f"Deleted registry key HKCU\\...\\Run\\{app_name_reg}") # Debug print
- QMessageBox.information(self, "Startup Setting", "Application will no longer run on system startup.")
- except FileNotFoundError:
- # Key or value might not exist, which is fine
- # FIX: Escape backslashes in print statement
- print(f"Registry key HKCU\\...\\Run\\{app_name_reg} not found, no action needed.") # Debug print
- QMessageBox.information(self, "Startup Setting", "Application was not set to run on startup.")
- winreg.CloseKey(key)
- except Exception as e:
- print(f"ERROR modifying Windows registry for startup: {e}") # Debug print
- QMessageBox.critical(self, "Startup Setting Error",
- f"Could not modify Windows registry for startup setting:\n{e}\n\nTry running the application with administrator privileges if the issue persists.")
- # Revert checkbox state in UI if registry modification failed
- self.checkbox_startup.setChecked(not run_at_startup)
- # Note: The config value remains the *intended* state based on the checkbox click.
- # --- System Tray Icon ---
- class TrayIcon(QSystemTrayIcon):
- def __init__(self, icon, parent=None):
- # parent here is the QApplication instance passed during creation
- # The QSystemTrayIcon constructor accepts a QObject parent, so passing 'parent' (QApplication) here is fine.
- super().__init__(icon, parent)
- self.app = QApplication.instance() # Get the QApplication instance
- self.dialog = None # Reference to the main dialog
- # FIX: Initialize _current_msg_box here! This was the cause of the AttributeError.
- self._current_msg_box = None # <--- ADDED THIS LINE BACK IN
- # ADDED: Set to track domains seen for notifications during this session (lowercase)
- self.seen_domains = set()
- # --- File Watcher ---
- self.file_watcher = FileWatcher(TEXT_FILE_PATH)
- # Connect the signal from the worker object (watcher) to the slot in the main thread object (self)
- self.file_watcher.new_domain_signal.connect(self.handle_new_domain)
- self.file_watcher.start_watching() # Start the watcher QThread
- # --- Menu ---
- # FIX: The parent of a QMenu should be a QWidget or None. QApplication is not a QWidget.
- # Set parent to None for a top-level context menu associated with the tray icon.
- self.menu = QMenu(None) # <--- CORRECTED LINE HERE
- self.show_dialog_action = QAction("Show Blocked Domains", self)
- # We connect triggered to a method in THIS class (TrayIcon)
- self.show_dialog_action.triggered.connect(self.show_dialog)
- self.menu.addAction(self.show_dialog_action)
- self.exit_action = QAction("Exit", self)
- self.exit_action.triggered.connect(self.exit_app)
- self.menu.addAction(self.exit_action)
- self.setContextMenu(self.menu)
- self.setToolTip(APP_NAME)
- # FIX: Connect the activated signal to a custom handler
- self.activated.connect(self._handle_tray_activation) # <--- Add this line
- # --- Message Box Timers ---
- self._msg_box_close_timer = QTimer(self) # Timer for closing the box
- self._msg_box_close_timer.setSingleShot(True) # It should fire only once
- self._msg_box_close_timer.timeout.connect(self._handle_msg_box_timeout)
- self._msg_box_countdown_timer = QTimer(self) # Timer for updating countdown label
- self._msg_box_countdown_timer.setSingleShot(False) # It should fire repeatedly
- self._msg_box_countdown_timer.timeout.connect(self._update_msg_box_timer_label)
- # Add attributes to store the timer label and countdown value
- self._msg_box_timer_label = None
- self._countdown_value = 0 # Keep countdown value
- def show_dialog(self):
- """Creates or shows the main dialog window."""
- print("Tray icon clicked: Showing dialog.") # Debug print
- if self.dialog is None:
- print("Dialog is None, creating new dialog instance.") # Debug print
- # FIX: QDialog parent must be a QWidget or None. TrayIcon is not a QWidget.
- # Pass None as the parent.
- self.dialog = DomainListDialog(None) # <--- CORRECTED LINE HERE
- else:
- print("Dialog exists, showing existing instance.") # Debug print
- self.dialog.populate_list() # Refresh the list when showing
- self.dialog.show()
- self.dialog.activateWindow() # Bring dialog to front
- self.dialog.raise_()
- # ADDED: Handler for tray icon activation
- def _handle_tray_activation(self, reason):
- """Handles activation signals from the tray icon."""
- # print(f"Tray icon activated. Reason: {reason}") # Debug print
- # QSystemTrayIcon.Trigger corresponds to the primary click (usually left click)
- # QSystemTrayIcon.Context corresponds to the secondary click (usually right click)
- # QSystemTrayIcon.DoubleClick corresponds to a double click
- if reason == QSystemTrayIcon.ActivationReason.Trigger:
- print("Tray icon activated by Trigger (likely left click). Showing dialog.") # Debug print
- self.show_dialog()
- # You could optionally add handling for DoubleClick here if you want different behavior
- # elif reason == QSystemTrayIcon.ActivationReason.DoubleClick:
- # print("Tray icon double-clicked.") # Debug print
- # self.show_dialog() # Example: double-click also shows dialog
- def exit_app(self):
- """Cleans up and exits the application."""
- print("Exit action triggered. Stopping watcher and quitting app.") # Debug print
- # Stop the file watcher thread gracefully
- if self.file_watcher:
- self.file_watcher.stop_watching()
- # Give the thread a moment to finish its current task/sleep
- # It's generally okay to not wait() here if the main thread is exiting anyway,
- # but explicitly stopping is good practice.
- # self.file_watcher._thread.wait(2000) # Avoid blocking main thread excessively
- self.app.quit()
- def handle_new_domain(self, domain):
- """Receives new domain signals from the file watcher (in the main thread)."""
- print(f"handle_new_domain received signal for: {domain} (Main Thread)") # <-- Debug print
- # This slot is connected via Qt.QueuedConnection automatically as it's cross-thread
- # Ensure notifications are enabled via config
- if not ConfigManager.get("notifications_enabled", True):
- print("Notifications disabled in config. Skipping notification.") # <-- Debug print
- return # Exit early if notifications are off
- # Check if the domain (lowercase) is already seen during this session for notifications
- domain_lower = domain.strip().lower() # Ensure lowercase and no extra whitespace
- # Only proceed if the domain is not empty and is new to our session tracking
- if domain_lower and domain_lower not in self.seen_domains:
- print(f"Domain '{domain}' (as '{domain_lower}') is new to the session ({len(self.seen_domains)} items).") # <-- Debug print
- # Add the domain (lowercase) to the session-wide seen_domains set immediately
- self.seen_domains.add(domain_lower)
- # If the dialog is currently open and visible, add the domain to its table
- if self.dialog is not None and self.dialog.isVisible():
- print(f"Dialog is open. Attempting to add '{domain}' to dialog table.") # Debug print
- # Pass the original case domain to the dialog method
- self.dialog.add_domain_row(domain.strip()) # The dialog's add_domain_row checks its own set
- # Show the message box notification (independent of dialog visibility)
- self.show_domain_notification(domain.strip()) # Pass the stripped original case domain for display
- # No print needed here for already seen domains, as it's normal.
- def show_domain_notification(self, full_domain):
- """Shows a QMessagebox prompting the user to whitelist a domain."""
- print(f"Attempting to show notification message box for: {full_domain}") # <-- Debug print
- # Prevent multiple message boxes stacking up
- # Check if the current message box exists AND is visible
- # This line caused the AttributeError because _current_msg_box wasn't initialized
- if self._current_msg_box is not None and self._current_msg_box.isVisible():
- print("Notification box already visible. Skipping new notification.") # Debug print
- # Optionally update the existing one or queue? Skipping for now.
- return
- # Determine parent for the message box. Using None is safest if dialog might be closed.
- parent_widget = None # Use None as parent for the message box
- self._current_msg_box = QMessageBox(parent_widget)
- self._current_msg_box.setWindowTitle(f"{APP_NAME} - New Domain Blocked") # Use APP_NAME
- self._current_msg_box.setIcon(QMessageBox.Question)
- # Set the message text with bolding and HTML line breaks for blank lines
- # REPLACED \n with <br> for blank lines when using Qt.RichText
- self._current_msg_box.setText(f"Blocked domain detected:<br><br><br> <b>{full_domain}</b><br><br><br>Whitelist this domain?") # <--- Replaced \n\n\n with <br><br><br>
- self._current_msg_box.setTextFormat(Qt.RichText) # <-- Keep RichText for bolding
- # Define buttons and their roles
- btn_whitelist_full = self._current_msg_box.addButton("Whitelist Full", QMessageBox.AcceptRole)
- btn_whitelist_base = self._current_msg_box.addButton("Whitelist Base", QMessageBox.AcceptRole)
- btn_ignore = self._current_msg_box.addButton("Ignore", QMessageBox.RejectRole)
- # Set Ignore as the default button if the user presses Enter
- self._current_msg_box.setDefaultButton(btn_ignore)
- # Map buttons to actions (Use the same strings as the button text and default action config)
- button_map = {
- btn_whitelist_full: "Whitelist Full",
- btn_whitelist_base: "Whitelist Base",
- btn_ignore: "Ignore"
- }
- # Get timeout from config
- timeout_seconds = ConfigManager.get("messagebox_timeout_seconds", DEFAULT_TIMEOUT_SECONDS)
- print(f"Message box timeout set to {timeout_seconds} seconds.") # Debug print
- # Initialize countdown value and timer label
- self._countdown_value = timeout_seconds # <--- Initialize countdown value
- # Create timer label, parented to the message box
- self._msg_box_timer_label = QLabel(f"Closing in {self._countdown_value} seconds...", self._current_msg_box) # <--- Create timer label
- self._msg_box_timer_label.setAlignment(Qt.AlignCenter)
- self._msg_box_timer_label.setTextFormat(Qt.RichText) # Enable rich text for label
- # Get the layout of the message box and add the label
- # QMessageBox uses a QGridLayout internally. Buttons are added first, then the standard text/icon area.
- # The layout structure can vary slightly by style/version, but adding to the last row should work.
- layout = self._current_msg_box.layout()
- # Find the row containing the buttons. Add label below buttons.
- # A safer way might be to add the label into the standard QMessageBox content layout area if possible,
- # but adding it to a new row at the bottom of the grid layout is more direct with the current structure.
- current_row = layout.rowCount() # Get the row index *after* buttons and main content have been added
- layout.addWidget(self._msg_box_timer_label, current_row, 0, 1, layout.columnCount(), Qt.AlignCenter) # <--- Add the label to the layout
- # Start the timers
- # Stop any previous timers just in case
- if self._msg_box_close_timer.isActive():
- self._msg_box_close_timer.stop()
- if self._msg_box_countdown_timer.isActive():
- self._msg_box_countdown_timer.stop()
- # Start the single-shot timer for closing
- self._msg_box_close_timer.start(timeout_seconds * 1000) # Start close timer with full duration
- # Start the repeating timer for the countdown label
- self._msg_box_countdown_timer.start(1000) # Start countdown timer (ticks every 1s)
- print(f"Message box timers started. Close in {timeout_seconds}s, countdown ticks every 1s.") # Debug print
- print(f"Showing message box for {full_domain}.") # Debug print
- # Connect finished signal to handle button clicks and timeout
- # The finished signal is emitted when the box is closed, either by button click or implicitly (e.g., timeout causing accept/reject)
- self._current_msg_box.finished.connect(lambda result: self._handle_msg_box_finished(result, full_domain, button_map))
- # Show the message box modally - blocks the main thread until the user responds or it times out.
- # Be aware that blocking the main thread prevents other GUI updates temporarily.
- # For a notification, modal is usually desired.
- self._current_msg_box.exec_()
- # The finished signal handler has already run when exec_() returns.
- # Explicitly stop timers again just in case (should be stopped in finished handler too)
- if self._msg_box_close_timer.isActive():
- self._msg_box_close_timer.stop()
- if self._msg_box_countdown_timer.isActive():
- self._msg_box_countdown_timer.stop()
- # print("Message box timers stopped after exec_ returned.") # Optional debug print
- # Clear the reference and temporary attributes after it's closed
- self._current_msg_box = None
- self._msg_box_timer_label = None
- self._countdown_value = 0 # Reset countdown value
- # ADDED: Slot to update the countdown timer label
- def _update_msg_box_timer_label(self):
- """Updates the countdown timer label in the message box."""
- # print(f"Countdown timer tick. Value before decrement: {self._countdown_value}") # Optional verbose debug
- # Decrement the countdown value
- self._countdown_value -= 1
- # Update the label text if the message box is still open and label exists
- if self._current_msg_box and self._msg_box_timer_label:
- if self._countdown_value > 0:
- self._msg_box_timer_label.setText(f"Closing in {self._countdown_value} seconds...")
- elif self._countdown_value == 0:
- self._msg_box_timer_label.setText("Closing now...")
- # No need for else for < 0, label stays at "Closing now..."
- # print(f"Countdown timer tick. Value after decrement: {self._countdown_value}") # Optional verbose debug
- def _handle_msg_box_timeout(self):
- """Handles the message box timeout (triggered by single-shot timer)."""
- print("Message box CLOSE timer timeout signal received.") # Debug print
- if self._current_msg_box and self._current_msg_box.isVisible():
- print("Closing message box due to timeout.") # Debug print
- # Stop the countdown timer when the close timer fires
- if self._msg_box_countdown_timer.isActive():
- self._msg_box_countdown_timer.stop()
- print("Stopped countdown timer in close timeout handler.") # Debug print
- # Closing the box will emit the finished signal with Rejected (0) as the result,
- # which our _handle_msg_box_finished slot is connected to.
- # It will then correctly apply the default action.
- self._current_msg_box.done(QMessageBox.Rejected)
- else:
- print("CLOSE timer timed out but message box was not visible/valid.") # Debug print
- def _handle_msg_box_finished(self, result, full_domain, button_map):
- """Handles the result when the message box is closed (by button or timeout).
- This slot is called after exec_() returns."""
- print(f"_handle_msg_box_finished handler called for {full_domain}. Result: {result}") # Debug print
- # Ensure BOTH timers are stopped when the box finishes, regardless of how it finished
- if self._msg_box_close_timer.isActive():
- self._msg_box_close_timer.stop()
- print("Close timer stopped in finished handler.") # Debug print
- if self._msg_box_countdown_timer.isActive():
- self._msg_box_countdown_timer.stop()
- print("Countdown timer stopped in finished handler.") # Debug print
- # Determine which button was clicked or if it timed out
- clicked_button = self._current_msg_box.clickedButton() # Get the button object that was clicked
- action = "Ignore" # Default action if button isn't in map or timeout action isn't determined
- if clicked_button in button_map:
- action = button_map[clicked_button]
- print(f"User clicked button: {action}") # Debug print
- else:
- # If clicked_button is None, it implies timeout or other non-button close.
- # Check the result. If it's Rejected (0), it was likely due to the timeout slot closing it.
- if result == QMessageBox.Rejected:
- action = ConfigManager.get("default_prompt_action", DEFAULT_PROMPT_ACTION)
- print(f"Message box finished without button click (result Rejected). Applying default action: {action}") # Debug print
- else:
- # Fallback for other unexpected closes (e.g., Esc key if not handled by default button)
- print(f"Message box finished unexpectedly with result {result}, clicked_button is None. Cannot determine action precisely, defaulting to Ignore.") # Debug print
- action = "Ignore" # Default if determination fails
- print(f"Handling message box action '{action}' for domain '{full_domain}'.") # Debug print
- if action == "Whitelist Full":
- print(f"Whitelisting full domain: {full_domain}") # Debug print
- # The utility function add_domain_to_whitelist expects lowercase for storing/checking
- add_domain_to_whitelist(full_domain.strip()) # Pass stripped original case, utility converts to lower
- # Optionally restart service here or prompt
- elif action == "Whitelist Base":
- print(f"Whitelisting base domain for: {full_domain}") # Debug print
- base_domain_to_add = full_domain.strip() # Default to full domain (stripped)
- try:
- base = tldextract.extract(full_domain.strip())
- # Ensure both domain and suffix exist for a valid base domain
- if base.domain and base.suffix:
- base_domain_to_add = f"{base.domain}.{base.suffix}"
- print(f"Extracted base domain: {base_domain_to_add}") # Debug print
- except Exception as e:
- print(f"ERROR extracting base domain for '{full_domain}': {e}. Using full domain.") # Debug print
- # QMessageBox.warning(self.dialog, "Error Whitelisting Base", f"Could not extract base domain for '{full_domain}':\n{e}") # Avoid message boxes from this handler
- base_domain_to_add = full_domain.strip() # Fallback to full domain (stripped) if extraction failed
- # The utility function add_domain_to_whitelist expects lowercase for storing/checking
- add_domain_to_whitelist(base_domain_to_add) # Pass determined base domain, utility converts to lower
- # Optionally restart service here or prompt
- elif action == "Ignore":
- print(f"Ignoring domain: {full_domain}") # Debug print
- pass # Do nothing
- else:
- print(f"Unknown action '{action}'. Ignoring domain: {full_domain}") # Debug print
- pass # Unknown action, treat as ignore
- # The finished signal is emitted when exec_() returns, so the box is already closed.
- # --- Main Application Entry Point ---
- if __name__ == '__main__':
- print("Application starting.") # Debug print
- ConfigManager.load() # Load configuration at startup
- app = QApplication(sys.argv)
- app.setQuitOnLastWindowClosed(False) # Don't exit when the dialog is closed
- # Ensure the necessary directories exist
- for d in [BASE_DIR, os.path.dirname(TEXT_FILE_PATH), os.path.dirname(OUTPUT_FILE_PATH), os.path.dirname(CONFIG_FILE_PATH)]:
- # Check if d is not empty string before trying to make dir (e.g. if path is just "file.log")
- if d and not os.path.exists(d):
- try:
- os.makedirs(d, exist_ok=True) # exist_ok=True prevents error if dir already exists
- print(f"Ensured directory exists: {d}") # Debug print
- except Exception as e:
- print(f"Warning: Could not create directory {d}: {e}") # Debug print
- # Load the icon (make sure 'icon.png' is in the BASE_DIR)
- icon_path = os.path.join(BASE_DIR, "icon.png")
- if os.path.exists(icon_path):
- app_icon = QIcon(icon_path)
- print(f"Loaded icon from: {icon_path}") # Debug print
- else:
- app_icon = QIcon() # Use a default blank icon if not found
- print(f"Warning: Icon file not found at: {icon_path}. Using default icon.") # Debug print
- # Create and show the tray icon
- # Pass the app instance as the parent (QSystemTrayIcon accepts QObject parent)
- tray_icon = TrayIcon(app_icon, app)
- tray_icon.show()
- print("Tray icon created and shown.") # Debug print
- # Set the application icon (optional, might show in taskbar/alt-tab)
- app.setWindowIcon(app_icon)
- # Start the Qt event loop
- print("Starting Qt event loop.") # Debug print
- # app.exec() is the old name, exec_() is the modern name to avoid keyword conflict in Python 3
- exit_code = app.exec_()
- print(f"Qt event loop finished with exit code: {exit_code}") # Debug print
- # This code is typically not reached after sys.exit()
- print("Application finished.") # Debug print
- sys.exit(exit_code) # Ensure proper exit code propagates
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement