Advertisement
El_Chaderino

cognitive timing vulnerability and modulation opportunity Module

May 27th, 2025
992
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 67.56 KB | None | 0 0
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3.  
  4. """
  5. TimingAttackFlagger Module
  6. https://github.com/ElChaderino/The-Squiggle-Interpreter
  7. Identifies windows of heightened cognitive timing vulnerability or modulation opportunity
  8. using cross-frequency phase-amplitude coupling (PAC). Enhanced with multi-band, cross-channel,
  9. multi-scale, and event-related PAC, statistical validation, advanced visualization, and
  10. integration with Swingle/Gunkelman-inspired clinical protocols and the Squiggle Interpreter.
  11. """
  12.  
  13. import numpy as np
  14. import scipy.signal as signal
  15. from typing import List, Tuple, Dict, Optional, Union
  16. import logging
  17. import matplotlib.pyplot as plt
  18. from pathlib import Path
  19. import os
  20. from scipy.stats import zscore
  21. import sklearn
  22. import pywt
  23.  
  24. logger = logging.getLogger(__name__)
  25.  
  26. class TimingAttackFlagger:
  27.     def __init__(
  28.         self,
  29.         sfreq: float,
  30.         bands: Dict[str, Tuple[float, float]] = None,
  31.         threshold: float = 0.6,
  32.         n_surrogates: int = 100,
  33.         p_value: float = 0.05,
  34.         coherence_threshold: float = 0.6,
  35.         baseline_duration: float = 60.0,
  36.         normative_mi_mean: float = 0.4,
  37.         normative_mi_std: float = 0.1
  38.     ):
  39.         """
  40.        Initialize the TimingAttackFlagger.
  41.  
  42.        Args:
  43.            sfreq (float): Sampling frequency in Hz.
  44.            bands (Dict[str, Tuple[float, float]]): Frequency bands for PAC analysis.
  45.                Default: {'delta': (1, 4), 'theta': (4, 8), 'alpha': (8, 12), 'gamma': (30, 80)}.
  46.            threshold (float): Modulation index threshold for flagging (0 to 1). Default: 0.6.
  47.            n_surrogates (int): Number of surrogate datasets for validation. Default: 100.
  48.            p_value (float): P-value threshold for significant PAC. Default: 0.05.
  49.            coherence_threshold (float): Coherence threshold for cross-channel PAC. Default: 0.6.
  50.            baseline_duration (float): Duration (s) for baseline MI calculation. Default: 60.0.
  51.            normative_mi_mean (float): Mean MI for z-score normalization. Default: 0.4.
  52.            normative_mi_std (float): Std dev of MI for z-score normalization. Default: 0.1.
  53.        """
  54.         self.sfreq = sfreq
  55.         self.bands = bands if bands else {
  56.             'delta': (1, 4),
  57.             'theta': (4, 8),
  58.             'alpha': (8, 12),
  59.             'gamma': (30, 80)
  60.         }
  61.         self.threshold = threshold
  62.         self.n_surrogates = n_surrogates
  63.         self.p_value = p_value
  64.         self.coherence_threshold = coherence_threshold
  65.         self.baseline_duration = baseline_duration
  66.         self.normative_mi_mean = normative_mi_mean
  67.         self.normative_mi_std = normative_mi_std
  68.         self.baseline_mi = None
  69.  
  70.     def bandpass(self, data: np.ndarray, band: Tuple[float, float]) -> np.ndarray:
  71.         """
  72.        Apply bandpass filter to EEG data.
  73.  
  74.        Args:
  75.            data (np.ndarray): Input EEG signal (1D).
  76.            band (Tuple[float, float]): Frequency band (low, high) in Hz.
  77.  
  78.        Returns:
  79.            np.ndarray: Filtered signal.
  80.        """
  81.         nyq = 0.5 * self.sfreq
  82.         low, high = band[0] / nyq, band[1] / nyq
  83.         b, a = signal.butter(4, [low, high], btype='band')
  84.         return signal.filtfilt(b, a, data)
  85.  
  86.     def compute_coherence(self, signal1: np.ndarray, signal2: np.ndarray, freq_band: Tuple[float, float]) -> float:
  87.         """Compute coherence between two signals in a frequency band."""
  88.         f, Cxy = signal.coherence(signal1, signal2, fs=self.sfreq, nperseg=int(self.sfreq))
  89.         idx = (f >= freq_band[0]) & (f <= freq_band[1])
  90.         return np.mean(Cxy[idx]) if np.any(idx) else 0.0
  91.  
  92.     def compute_phase_amplitude_coupling(
  93.         self,
  94.         phase_signal: np.ndarray,
  95.         amplitude_signal: np.ndarray
  96.     ) -> Tuple[float, float]:
  97.         """
  98.        Compute phase-amplitude coupling (PAC) modulation index and p-value.
  99.  
  100.        Args:
  101.            phase_signal (np.ndarray): Phase signal (e.g., theta phase).
  102.            amplitude_signal (np.ndarray): Amplitude signal (e.g., gamma amplitude).
  103.  
  104.        Returns:
  105.            Tuple[float, float]: Modulation index and p-value from surrogate testing.
  106.        """
  107.         n_bins = 18
  108.         bins = np.linspace(-np.pi, np.pi, n_bins + 1)
  109.         pac = np.zeros(n_bins)
  110.  
  111.         for i in range(n_bins):
  112.             idx = np.where((phase_signal >= bins[i]) & (phase_signal < bins[i + 1]))[0]
  113.             if len(idx) > 0:
  114.                 pac[i] = np.mean(amplitude_signal[idx])
  115.  
  116.         pac /= pac.sum() + 1e-10
  117.         entropy = -np.sum(pac * np.log(pac + 1e-10)) / np.log(n_bins)
  118.         modulation_index = 1 - entropy
  119.  
  120.         # Surrogate testing for statistical significance
  121.         surrogate_mis = []
  122.         for _ in range(self.n_surrogates):
  123.             shuffled_phase = np.random.permutation(phase_signal)
  124.             surrogate_pac = np.zeros(n_bins)
  125.             for i in range(n_bins):
  126.                 idx = np.where((shuffled_phase >= bins[i]) & (shuffled_phase < bins[i + 1]))[0]
  127.                 if len(idx) > 0:
  128.                     surrogate_pac[i] = np.mean(amplitude_signal[idx])
  129.             surrogate_pac /= surrogate_pac.sum() + 1e-10
  130.             surrogate_entropy = -np.sum(surrogate_pac * np.log(surrogate_pac + 1e-10)) / np.log(n_bins)
  131.             surrogate_mis.append(1 - surrogate_entropy)
  132.        
  133.         p_value = np.mean(np.array(surrogate_mis) >= modulation_index)
  134.         return modulation_index, p_value
  135.  
  136.     def compute_baseline_mi(
  137.         self, eeg_data: np.ndarray, phase_band: str = 'theta', amplitude_band: str = 'gamma'
  138.     ) -> float:
  139.         """Compute baseline modulation index from a resting-state segment."""
  140.         baseline_samples = int(self.baseline_duration * self.sfreq)
  141.         if len(eeg_data) < baseline_samples:
  142.             logger.warning("EEG data shorter than baseline duration; using full data.")
  143.             baseline_samples = len(eeg_data)
  144.  
  145.         phase_signal = self.bandpass(eeg_data[:baseline_samples], self.bands[phase_band])
  146.         amplitude_signal = self.bandpass(eeg_data[:baseline_samples], self.bands[amplitude_band])
  147.         phase_data = np.angle(signal.hilbert(phase_signal))
  148.         amplitude_data = np.abs(signal.hilbert(amplitude_signal))
  149.         mi, _ = self.compute_phase_amplitude_coupling(phase_data, amplitude_data)
  150.         self.baseline_mi = mi
  151.         logger.info(f"Computed baseline MI: {mi:.4f}")
  152.         return mi
  153.  
  154.     def normalize_pac(self, pac_scores: List[float]) -> List[float]:
  155.         """Normalize PAC scores to z-scores using normative or baseline data."""
  156.         if self.baseline_mi is not None:
  157.             mean = self.baseline_mi
  158.             std = self.normative_mi_std
  159.         else:
  160.             mean = self.normative_mi_mean
  161.             std = self.normative_mi_std
  162.         return zscore(pac_scores, mu=mean, sigma=std).tolist()
  163.  
  164.     def flag_sequences(
  165.         self,
  166.         eeg_data: np.ndarray,
  167.         phase_band: str = 'theta',
  168.         amplitude_band: str = 'gamma',
  169.         channel_name: str = 'Unknown',
  170.         task_timestamps: Optional[List[Tuple[float, float]]] = None
  171.     ) -> Tuple[List[Tuple[float, float, float, float]], List[float]]:
  172.         """
  173.        Flag windows with high PAC for a single channel, optionally within task segments.
  174.  
  175.        Args:
  176.            eeg_data (np.ndarray): EEG signal (1D).
  177.            phase_band (str): Band for phase (e.g., 'theta', 'alpha'). Default: 'theta'.
  178.            amplitude_band (str): Band for amplitude (e.g., 'gamma'). Default: 'gamma'.
  179.            channel_name (str): Name of the channel. Default: 'Unknown'.
  180.            task_timestamps (Optional[List[Tuple[float, float]]]): List of task segments.
  181.  
  182.        Returns:
  183.            Tuple[List[Tuple[float, float, float, float]], List[float]]:
  184.                - List of (start_time, end_time, modulation_index, p_value) for flagged windows.
  185.                - List of modulation indices for all windows.
  186.        """
  187.         phase_signal = self.bandpass(eeg_data, self.bands[phase_band])
  188.         amplitude_signal = self.bandpass(eeg_data, self.bands[amplitude_band])
  189.  
  190.         phase_data = np.angle(signal.hilbert(phase_signal))
  191.         amplitude_data = np.abs(signal.hilbert(amplitude_signal))
  192.  
  193.         win_len = int(2 * self.sfreq)
  194.         step = int(0.5 * self.sfreq)
  195.         flags = []
  196.         pac_scores = []
  197.  
  198.         if task_timestamps:
  199.             for start_t, end_t in task_timestamps:
  200.                 start = int(start_t * self.sfreq)
  201.                 end = min(int(end_t * self.sfreq), len(eeg_data))
  202.                 for win_start in range(start, end - win_len, step):
  203.                     win_end = win_start + win_len
  204.                     mi, p_val = self.compute_phase_amplitude_coupling(
  205.                         phase_data[win_start:win_end], amplitude_data[win_start:win_end]
  206.                     )
  207.                     pac_scores.append(mi)
  208.                     if mi > self.threshold and p_val < self.p_value:
  209.                         flags.append((win_start / self.sfreq, win_end / self.sfreq, mi, p_val))
  210.         else:
  211.             for start in range(0, len(eeg_data) - win_len, step):
  212.                 end = start + win_len
  213.                 mi, p_val = self.compute_phase_amplitude_coupling(
  214.                     phase_data[start:end], amplitude_data[start:end]
  215.                 )
  216.                 pac_scores.append(mi)
  217.                 if mi > self.threshold and p_val < self.p_value:
  218.                     flags.append((start / self.sfreq, end / self.sfreq, mi, p_val))
  219.  
  220.         logger.info(f"Flagged {len(flags)} high PAC windows for {channel_name} ({phase_band}-{amplitude_band}).")
  221.         return flags, pac_scores
  222.  
  223.     def flag_cross_channel(
  224.         self,
  225.         eeg_data_phase: np.ndarray,
  226.         eeg_data_amplitude: np.ndarray,
  227.         phase_channel: str,
  228.         amplitude_channel: str,
  229.         phase_band: str = 'theta',
  230.         amplitude_band: str = 'gamma'
  231.     ) -> Tuple[List[Tuple[float, float, float, float]], List[float]]:
  232.         """
  233.        Flag windows with high cross-channel PAC, gated by coherence.
  234.  
  235.        Args:
  236.            eeg_data_phase (np.ndarray): EEG signal for phase channel.
  237.            eeg_data_amplitude (np.ndarray): EEG signal for amplitude channel.
  238.            phase_channel (str): Name of phase channel.
  239.            amplitude_channel (str): Name of amplitude channel.
  240.            phase_band (str): Band for phase. Default: 'theta'.
  241.            amplitude_band (str): Band for amplitude. Default: 'gamma'.
  242.  
  243.        Returns:
  244.            Tuple[List[Tuple[float, float, float, float]], List[float]]:
  245.                - List of (start_time, end_time, modulation_index, p_value) for flagged windows.
  246.                - List of modulation indices for all windows.
  247.        """
  248.         coherence = self.compute_coherence(eeg_data_phase, eeg_data_amplitude, self.bands[phase_band])
  249.         if coherence < self.coherence_threshold:
  250.             logger.warning(f"Coherence ({coherence:.2f}) below threshold ({self.coherence_threshold}). Skipping PAC.")
  251.             return [], []
  252.  
  253.         phase_signal = self.bandpass(eeg_data_phase, self.bands[phase_band])
  254.         amplitude_signal = self.bandpass(eeg_data_amplitude, self.bands[amplitude_band])
  255.  
  256.         phase_data = np.angle(signal.hilbert(phase_signal))
  257.         amplitude_data = np.abs(signal.hilbert(amplitude_signal))
  258.  
  259.         win_len = int(2 * self.sfreq)
  260.         step = int(0.5 * self.sfreq)
  261.         flags = []
  262.         pac_scores = []
  263.  
  264.         for start in range(0, len(eeg_data_phase) - win_len, step):
  265.             end = start + win_len
  266.             mi, p_val = self.compute_phase_amplitude_coupling(
  267.                 phase_data[start:end], amplitude_data[start:end]
  268.             )
  269.             pac_scores.append(mi)
  270.             if mi > self.threshold and p_val < self.p_value:
  271.                 flags.append((start / self.sfreq, end / self.sfreq, mi, p_val))
  272.  
  273.         logger.info(f"Flagged {len(flags)} cross-channel PAC windows ({phase_channel}-{amplitude_channel}).")
  274.         return flags, pac_scores
  275.  
  276.     def multi_scale_flag_sequences(
  277.         self,
  278.         eeg_data: np.ndarray,
  279.         phase_band: str = 'theta',
  280.         amplitude_band: str = 'gamma',
  281.         channel_name: str = 'Unknown',
  282.         window_lengths: List[float] = [1.0, 2.0, 5.0]
  283.     ) -> Dict[float, Tuple[List[Tuple[float, float, float, float]], List[float]]]:
  284.         """Flag PAC windows across multiple time scales."""
  285.         results = {}
  286.         for win_len in window_lengths:
  287.             self.threshold = self.threshold * (2.0 / win_len)  # Adjust threshold for scale
  288.             flags, pac_scores = self.flag_sequences(
  289.                 eeg_data, phase_band, amplitude_band, channel_name
  290.             )
  291.             results[win_len] = (flags, pac_scores)
  292.             self.threshold = self.threshold / (2.0 / win_len)  # Reset threshold
  293.         return results
  294.  
  295.     def event_related_pac(
  296.         self,
  297.         eeg_data: np.ndarray,
  298.         event_times: List[float],
  299.         channel_name: str,
  300.         window_pre: float = 0.5,
  301.         window_post: float = 1.0,
  302.         phase_band: str = 'theta',
  303.         amplitude_band: str = 'gamma'
  304.     ) -> List[Tuple[float, float, float]]:
  305.         """Compute PAC around specific events."""
  306.         phase_signal = self.bandpass(eeg_data, self.bands[phase_band])
  307.         amplitude_signal = self.bandpass(eeg_data, self.bands[amplitude_band])
  308.         phase_data = np.angle(signal.hilbert(phase_signal))
  309.         amplitude_data = np.abs(signal.hilbert(amplitude_signal))
  310.  
  311.         results = []
  312.         for event_time in event_times:
  313.             start = int((event_time - window_pre) * self.sfreq)
  314.             end = int((event_time + window_post) * self.sfreq)
  315.             if start >= 0 and end < len(eeg_data):
  316.                 mi, p_val = self.compute_phase_amplitude_coupling(
  317.                     phase_data[start:end], amplitude_data[start:end]
  318.                 )
  319.                 if mi > self.threshold and p_val < self.p_value:
  320.                     results.append((event_time, mi, p_val))
  321.  
  322.         logger.info(f"Flagged {len(results)} event-related PAC windows for {channel_name}.")
  323.         return results
  324.  
  325.     def correlate_with_vigilance(
  326.         self,
  327.         pac_scores: List[float],
  328.         vigilance_states: List[Tuple[float, str]],
  329.         time_step: float = 0.5
  330.     ) -> Dict[str, List[float]]:
  331.         """Correlate PAC scores with vigilance states."""
  332.         pac_times = np.arange(0, len(pac_scores) * time_step, time_step)
  333.         vigilance_dict = {state: [] for state in set(s for _, s in vigilance_states)}
  334.  
  335.         for i, pac in enumerate(pac_scores):
  336.             pac_time = pac_times[i]
  337.             closest_vigilance = min(vigilance_states, key=lambda x: abs(x[0] - pac_time), default=(0, 'Undefined'))
  338.             vigilance_dict[closest_vigilance[1]].append(pac)
  339.  
  340.         return {state: pacs for state, pacs in vigilance_dict.items() if pacs}
  341.  
  342.     def suggest_protocols(
  343.         self,
  344.         flags: List[Tuple[float, float, float, float]],
  345.         channel_name: str,
  346.         phase_band: str = 'theta',
  347.         amplitude_band: str = 'gamma'
  348.     ) -> List[str]:
  349.         """Suggest Swingle-style neurofeedback protocols based on PAC findings."""
  350.         protocols = []
  351.         if flags:
  352.             avg_mi = np.mean([mi for _, _, mi, _ in flags])
  353.             if avg_mi > 0.8:
  354.                 protocols.append(f"Inhibit high {phase_band}-{amplitude_band} PAC at {channel_name} to reduce hyperarousal.")
  355.             else:
  356.                 protocols.append(f"Reward gamma bursts during high {phase_band} phase at {channel_name} to enhance attention.")
  357.             protocols.append(f"SMR training at Cz (12–15 Hz) during flagged PAC windows.")
  358.             protocols.append("Adjunct: Cognitive task training during high PAC periods.")
  359.         else:
  360.             protocols.append("No high PAC detected; consider standard SMR training at Cz.")
  361.         return protocols
  362.  
  363.     def map_to_phenotype(
  364.         self,
  365.         pac_scores: List[float],
  366.         channel_name: str,
  367.         phenotypes: List[Dict]
  368.     ) -> List[Tuple[str, float]]:
  369.         """Map PAC scores to phenotypes from phenotype_ruleset.py."""
  370.         matches = []
  371.         avg_pac = np.mean(pac_scores) if pac_scores else 0.0
  372.         for phenotype in phenotypes:
  373.             condition = phenotype.get('condition')
  374.             if callable(condition) and condition({'pac_theta_gamma': avg_pac}):
  375.                 matches.append((phenotype['name'], phenotype['confidence']))
  376.         return matches
  377.  
  378.     def plot_comodulogram(
  379.         self,
  380.         eeg_data: np.ndarray,
  381.         channel_name: str,
  382.         output_dir: Union[str, Path],
  383.         phase_band: str = 'theta',
  384.         amplitude_band: str = 'gamma'
  385.     ) -> Optional[str]:
  386.         """
  387.        Generate and save a PAC comodulogram.
  388.  
  389.        Args:
  390.            eeg_data (np.ndarray): EEG signal (1D).
  391.            channel_name (str): Name of the channel.
  392.            output_dir (Union[str, Path]): Directory to save the plot.
  393.            phase_band (str): Band for phase. Default: 'theta'.
  394.            amplitude_band (str): Band for amplitude. Default: 'gamma'.
  395.  
  396.        Returns:
  397.            Optional[str]: Path to saved plot, or None if plotting fails.
  398.        """
  399.         if not plt:
  400.             logger.error("Matplotlib not available for plotting.")
  401.             return None
  402.  
  403.         output_dir = Path(output_dir)
  404.         output_dir.mkdir(parents=True, exist_ok=True)
  405.  
  406.         phase_signal = self.bandpass(eeg_data, self.bands[phase_band])
  407.         amplitude_signal = self.bandpass(eeg_data, self.bands[amplitude_band])
  408.         phase_data = np.angle(signal.hilbert(phase_signal))
  409.         amplitude_data = np.abs(signal.hilbert(amplitude_signal))
  410.  
  411.         n_bins = 18
  412.         bins = np.linspace(-np.pi, np.pi, n_bins + 1)
  413.         pac = np.zeros(n_bins)
  414.  
  415.         for i in range(n_bins):
  416.             idx = np.where((phase_data >= bins[i]) & (phase_data < bins[i + 1]))[0]
  417.             if len(idx) > 0:
  418.                 pac[i] = np.mean(amplitude_data[idx])
  419.  
  420.         plt.style.use('dark_background')
  421.         plt.figure(figsize=(8, 6))
  422.         plt.bar(bins[:-1], pac, width=2 * np.pi / n_bins, align='edge', color='cyan')
  423.         plt.xlabel(f'{phase_band.capitalize()} Phase (radians)', color='white')
  424.         plt.ylabel(f'{amplitude_band.capitalize()} Amplitude', color='white')
  425.         plt.title(f'PAC Comodulogram ({channel_name}, {phase_band}-{amplitude_band})', color='white')
  426.         plt.tight_layout()
  427.  
  428.         plot_path = output_dir / f"pac_comodulogram_{channel_name}_{phase_band}_{amplitude_band}.png"
  429.         plt.savefig(plot_path, dpi=150, facecolor='black')
  430.         plt.close()
  431.         logger.info(f"Saved PAC comodulogram to {plot_path}")
  432.         return str(plot_path)
  433.  
  434.     def plot_pac_timeseries(
  435.         self,
  436.         pac_scores: List[float],
  437.         flags: List[Tuple[float, float, float, float]],
  438.         channel_name: str,
  439.         output_dir: Union[str, Path],
  440.         phase_band: str = 'theta',
  441.         amplitude_band: str = 'gamma',
  442.         time_step: float = 0.5
  443.     ) -> Optional[str]:
  444.         """Plot PAC modulation index over time with flagged windows."""
  445.         if not plt:
  446.             logger.error("Matplotlib not available for plotting.")
  447.             return None
  448.  
  449.         output_dir = Path(output_dir)
  450.         output_dir.mkdir(parents=True, exist_ok=True)
  451.  
  452.         times = np.arange(0, len(pac_scores) * time_step, time_step)
  453.         plt.style.use('dark_background')
  454.         plt.figure(figsize=(12, 5))
  455.         plt.plot(times, pac_scores, color='cyan', label='Modulation Index')
  456.         for start, end, mi, _ in flags:
  457.             plt.axvspan(start, end, color='red', alpha=0.3)
  458.         plt.xlabel('Time (s)', color='white')
  459.         plt.ylabel('Modulation Index', color='white')
  460.         plt.title(f'PAC Time Series ({channel_name}, {phase_band}-{amplitude_band})', color='white')
  461.         plt.legend(facecolor='black', edgecolor='white', labelcolor='white')
  462.         plt.tight_layout()
  463.  
  464.         plot_path = output_dir / f"pac_timeseries_{channel_name}_{phase_band}_{amplitude_band}.png"
  465.         plt.savefig(plot_path, dpi=150, facecolor='black')
  466.         plt.close()
  467.         logger.info(f"Saved PAC time series to {plot_path}")
  468.         return str(plot_path)
  469.  
  470.     def report_output(
  471.         self,
  472.         flags: List[Tuple[float, float, float, float]],
  473.         channel_name: str,
  474.         phase_band: str = 'theta',
  475.         amplitude_band: str = 'gamma',
  476.         cross_channel: Optional[str] = None
  477.     ) -> str:
  478.         """
  479.        Generate a text report for flagged PAC windows.
  480.  
  481.        Args:
  482.            flags (List[Tuple[float, float, float, float]]): List of flagged windows.
  483.            channel_name (str): Name of the phase channel.
  484.            phase_band (str): Band for phase. Default: 'theta'.
  485.            amplitude_band (str): Band for amplitude. Default: 'gamma'.
  486.            cross_channel (Optional[str]): Name of amplitude channel for cross-channel PAC.
  487.  
  488.        Returns:
  489.            str: Formatted text report.
  490.        """
  491.         if not flags:
  492.             return f"No high PAC ({phase_band}-{amplitude_band}) windows detected on {channel_name}.\n"
  493.  
  494.         if cross_channel:
  495.             report = f"Cross-Channel Timing Attack Report ({channel_name}-{cross_channel}, {phase_band}-{amplitude_band}):\n"
  496.         else:
  497.             report = f"Timing Attack Report for Channel {channel_name} ({phase_band}-{amplitude_band}):\n"
  498.         report += f"Detected {len(flags)} windows exceeding PAC threshold ({self.threshold}, p<{self.p_value}):\n\n"
  499.         report += f"{'Start (s)':>10} | {'End (s)':>10} | {'Modulation Index':>18} | {'P-value':>10}\n"
  500.         report += f"{'-'*60}\n"
  501.  
  502.         for start, end, mi, p_val in flags:
  503.             report += f"{start:10.2f} | {end:10.2f} | {mi:18.4f} | {p_val:10.4f}\n"
  504.  
  505.         return report
  506.  
  507.     def to_html_report(
  508.         self,
  509.         flags: List[Tuple[float, float, float, float]],
  510.         channel_name: str,
  511.         pac_scores: List[float],
  512.         plot_paths: Dict[str, str] = None,
  513.         phase_band: str = 'theta',
  514.         amplitude_band: str = 'gamma',
  515.         cross_channel: Optional[str] = None,
  516.         vigilance_correlation: Optional[Dict[str, List[float]]] = None,
  517.         protocols: Optional[List[str]] = None,
  518.         phenotypes: Optional[List[Tuple[str, float]]] = None,
  519.         enhanced_results: Optional[Dict[str, Any]] = None
  520.     ) -> str:
  521.         """
  522.        Generate an HTML report for PAC analysis.
  523.  
  524.        Args:
  525.            flags (List[Tuple[float, float, float, float]]): List of flagged windows.
  526.            channel_name (str): Name of the phase channel.
  527.            pac_scores (List[float]): List of modulation indices for all windows.
  528.            plot_paths (Dict[str, str]): Dictionary of plot paths.
  529.            phase_band (str): Band for phase. Default: 'theta'.
  530.            amplitude_band (str): Band for amplitude. Default: 'gamma'.
  531.            cross_channel (Optional[str]): Name of amplitude channel for cross-channel PAC.
  532.            vigilance_correlation (Optional[Dict[str, List[float]]]): Dictionary of vigilance state correlations.
  533.            protocols (Optional[List[str]]): List of recommended protocols.
  534.            phenotypes (Optional[List[Tuple[str, float]]]): List of associated phenotypes.
  535.            enhanced_results (Optional[Dict[str, Any]]): Results from flag_sequences_enhanced.
  536.  
  537.        Returns:
  538.            str: HTML string for the PAC report.
  539.        """
  540.         lines = []
  541.         title = f"Cross-Channel PAC Analysis ({channel_name}-{cross_channel})" if cross_channel else f"PAC Analysis ({channel_name})"
  542.         lines.append(f"<h2 style='color: #2E7D32;'>{title}</h2>")
  543.         lines.append(f"<p><strong>Bands:</strong> {phase_band.capitalize()}-{amplitude_band.capitalize()}</p>")
  544.         lines.append(f"<p><strong>Threshold:</strong> Modulation Index > {self.threshold}, p < {self.p_value}</p>")
  545.  
  546.         # Add validation results if available
  547.         if enhanced_results and enhanced_results.get('validation'):
  548.             validation = enhanced_results['validation']
  549.             lines.append("<h3>Signal Validation</h3>")
  550.             lines.append("<table border='1' cellpadding='4' style='border-collapse: collapse; width: 60%;'>")
  551.             lines.append("<tr style='background-color: #E8F5E9;'><th>Metric</th><th>Value</th><th>Status</th></tr>")
  552.            
  553.             # Phase continuity
  554.             status = "✓" if validation['phase_continuity'] >= 0.7 else "⚠"
  555.             lines.append(f"<tr><td>Phase Continuity</td><td>{validation['phase_continuity']:.3f}</td><td>{status}</td></tr>")
  556.            
  557.             # Amplitude stability
  558.             status = "✓" if validation['amplitude_stability'] >= 0.5 else "⚠"
  559.             lines.append(f"<tr><td>Amplitude Stability</td><td>{validation['amplitude_stability']:.3f}</td><td>{status}</td></tr>")
  560.            
  561.             lines.append("</table>")
  562.  
  563.             if validation['warnings']:
  564.                 lines.append("<div style='color: #FFA500; margin: 10px 0;'>")
  565.                 lines.append("<strong>Validation Warnings:</strong>")
  566.                 lines.append("<ul>")
  567.                 for warning in validation['warnings']:
  568.                     lines.append(f"<li>{warning}</li>")
  569.                 lines.append("</ul>")
  570.                 lines.append("</div>")
  571.  
  572.         # Add artifact detection results if available
  573.         if enhanced_results and enhanced_results.get('artifacts'):
  574.             lines.append("<h3>Artifact Detection</h3>")
  575.             artifacts = enhanced_results['artifacts']
  576.             if any(artifacts.values()):
  577.                 lines.append("<table border='1' cellpadding='4' style='border-collapse: collapse; width: 60%;'>")
  578.                 lines.append("<tr style='background-color: #E8F5E9;'><th>Time (s)</th><th>Type</th><th>Severity</th></tr>")
  579.                
  580.                 for artifact_type, detections in artifacts.items():
  581.                     for time, type_name, severity in detections:
  582.                         lines.append(f"<tr><td>{time:.2f}</td><td>{type_name}</td><td>{severity:.2f}</td></tr>")
  583.                
  584.                 lines.append("</table>")
  585.             else:
  586.                 lines.append("<p>No significant artifacts detected.</p>")
  587.  
  588.         # Add additional coupling measures if available
  589.         if enhanced_results:
  590.             if enhanced_results.get('phase_phase_coupling'):
  591.                 lines.append("<h3>Phase-Phase Coupling</h3>")
  592.                 lines.append("<table border='1' cellpadding='4' style='border-collapse: collapse; width: 60%;'>")
  593.                 lines.append("<tr style='background-color: #E8F5E9;'><th>Band Pair</th><th>PLV</th><th>P-value</th></tr>")
  594.                
  595.                 for pair, results in enhanced_results['phase_phase_coupling'].items():
  596.                     lines.append(f"<tr><td>{pair}</td><td>{results['plv']:.3f}</td><td>{results['p_value']:.3f}</td></tr>")
  597.                
  598.                 lines.append("</table>")
  599.  
  600.             if enhanced_results.get('amplitude_amplitude_coupling'):
  601.                 lines.append("<h3>Amplitude-Amplitude Coupling</h3>")
  602.                 lines.append("<table border='1' cellpadding='4' style='border-collapse: collapse; width: 60%;'>")
  603.                 lines.append("<tr style='background-color: #E8F5E9;'><th>Band Pair</th><th>Correlation</th><th>P-value</th></tr>")
  604.                
  605.                 for pair, results in enhanced_results['amplitude_amplitude_coupling'].items():
  606.                     lines.append(f"<tr><td>{pair}</td><td>{results['correlation']:.3f}</td><td>{results['p_value']:.3f}</td></tr>")
  607.                
  608.                 lines.append("</table>")
  609.  
  610.         # Original PAC analysis results
  611.         z_scores = self.normalize_pac(pac_scores)
  612.         lines.append("<h3>PAC Score Statistics</h3>")
  613.         lines.append(f"<p><strong>Mean Modulation Index:</strong> {np.mean(pac_scores):.4f}</p>")
  614.         lines.append(f"<p><strong>Std Deviation:</strong> {np.std(pac_scores):.4f}</p>")
  615.         lines.append(f"<p><strong>Mean Z-Score:</strong> {np.mean(z_scores):.2f}</p>")
  616.  
  617.         if flags:
  618.             lines.append(f"<p><strong>Detected Windows:</strong> {len(flags)}</p>")
  619.             lines.append("<table border='1' cellpadding='4' style='border-collapse: collapse; width: 60%;'>")
  620.             lines.append("<tr style='background-color: #E8F5E9;'><th>Start (s)</th><th>End (s)</th><th>Modulation Index</th><th>P-value</th></tr>")
  621.             for start, end, mi, p_val in flags:
  622.                 lines.append(f"<tr><td>{start:.2f}</td><td>{end:.2f}</td><td>{mi:.4f}</td><td>{p_val:.4f}</td></tr>")
  623.             lines.append("</table>")
  624.         else:
  625.             lines.append("<p>No high PAC windows detected.</p>")
  626.  
  627.         if plot_paths:
  628.             for plot_type, plot_path in plot_paths.items():
  629.                 lines.append(f"<h3>{plot_type}</h3>")
  630.                 lines.append(f"<img src='{plot_path}' alt='{plot_type}' style='max-width: 600px;'>")
  631.  
  632.         if vigilance_correlation:
  633.             lines.append("<h3>Vigilance State Correlation</h3>")
  634.             lines.append("<ul>")
  635.             for state, scores in vigilance_correlation.items():
  636.                 lines.append(f"<li>State {state}: Mean PAC = {np.mean(scores):.4f} (n={len(scores)})</li>")
  637.             lines.append("</ul>")
  638.  
  639.         if protocols:
  640.             lines.append("<h3>Recommended Protocols</h3>")
  641.             lines.append("<ul>")
  642.             for protocol in protocols:
  643.                 lines.append(f"<li>{protocol}</li>")
  644.             lines.append("</ul>")
  645.  
  646.         if phenotypes:
  647.             lines.append("<h3>Associated Phenotypes</h3>")
  648.             lines.append("<ul>")
  649.             for name, confidence in phenotypes:
  650.                 lines.append(f"<li>{name} (Confidence: {confidence:.2f})</li>")
  651.             lines.append("</ul>")
  652.  
  653.         # Add any warnings from enhanced analysis
  654.         if enhanced_results and enhanced_results.get('warnings'):
  655.             lines.append("<div style='color: #FFA500; margin: 10px 0;'>")
  656.             lines.append("<strong>Analysis Warnings:</strong>")
  657.             lines.append("<ul>")
  658.             for warning in enhanced_results['warnings']:
  659.                 lines.append(f"<li>{warning}</li>")
  660.             lines.append("</ul>")
  661.             lines.append("</div>")
  662.  
  663.         return "\n".join(lines)
  664.  
  665.     def export_qeeg_report(
  666.         self,
  667.         flags: List[Tuple[float, float, float, float]],
  668.         pac_scores: List[float],
  669.         channel_name: str,
  670.         phase_band: str = 'theta',
  671.         amplitude_band: str = 'gamma'
  672.     ) -> Dict:
  673.         """Export PAC results in a qEEG-compatible format."""
  674.         z_scores = self.normalize_pac(pac_scores)
  675.         return {
  676.             'channel': channel_name,
  677.             'phase_band': phase_band,
  678.             'amplitude_band': amplitude_band,
  679.             'flags': flags,
  680.             'mean_mi': float(np.mean(pac_scores)),
  681.             'std_mi': float(np.std(pac_scores)),
  682.             'mean_z_score': float(np.mean(z_scores)),
  683.             'std_z_score': float(np.std(z_scores))
  684.         }
  685.  
  686.     def compute_phase_phase_coupling(
  687.         self,
  688.         signal1: np.ndarray,
  689.         signal2: np.ndarray,
  690.         band1: Tuple[float, float],
  691.         band2: Tuple[float, float]
  692.     ) -> Tuple[float, float]:
  693.         """
  694.        Compute phase-phase coupling between two frequency bands.
  695.  
  696.        Args:
  697.            signal1 (np.ndarray): First input signal
  698.            signal2 (np.ndarray): Second input signal
  699.            band1 (Tuple[float, float]): Frequency band for first signal
  700.            band2 (Tuple[float, float]): Frequency band for second signal
  701.  
  702.        Returns:
  703.            Tuple[float, float]: Phase locking value and p-value
  704.        """
  705.         # Filter signals in respective bands
  706.         sig1_filt = self.bandpass(signal1, band1)
  707.         sig2_filt = self.bandpass(signal2, band2)
  708.  
  709.         # Extract instantaneous phases
  710.         phase1 = np.angle(signal.hilbert(sig1_filt))
  711.         phase2 = np.angle(signal.hilbert(sig2_filt))
  712.  
  713.         # Compute phase difference
  714.         phase_diff = phase1 - phase2
  715.         plv = np.abs(np.mean(np.exp(1j * phase_diff)))
  716.  
  717.         # Surrogate testing
  718.         surrogate_plvs = []
  719.         for _ in range(self.n_surrogates):
  720.             shuffled_phase2 = np.random.permutation(phase2)
  721.             phase_diff_surr = phase1 - shuffled_phase2
  722.             plv_surr = np.abs(np.mean(np.exp(1j * phase_diff_surr)))
  723.             surrogate_plvs.append(plv_surr)
  724.  
  725.         p_value = np.mean(np.array(surrogate_plvs) >= plv)
  726.         return plv, p_value
  727.  
  728.     def compute_amplitude_amplitude_coupling(
  729.         self,
  730.         signal1: np.ndarray,
  731.         signal2: np.ndarray,
  732.         band1: Tuple[float, float],
  733.         band2: Tuple[float, float]
  734.     ) -> Tuple[float, float]:
  735.         """
  736.        Compute amplitude-amplitude coupling between two frequency bands.
  737.  
  738.        Args:
  739.            signal1 (np.ndarray): First input signal
  740.            signal2 (np.ndarray): Second input signal
  741.            band1 (Tuple[float, float]): Frequency band for first signal
  742.            band2 (Tuple[float, float]): Frequency band for second signal
  743.  
  744.        Returns:
  745.            Tuple[float, float]: Correlation coefficient and p-value
  746.        """
  747.         # Filter signals and get amplitude envelopes
  748.         sig1_filt = self.bandpass(signal1, band1)
  749.         sig2_filt = self.bandpass(signal2, band2)
  750.        
  751.         amp1 = np.abs(signal.hilbert(sig1_filt))
  752.         amp2 = np.abs(signal.hilbert(sig2_filt))
  753.  
  754.         # Compute correlation
  755.         corr = np.corrcoef(amp1, amp2)[0, 1]
  756.  
  757.         # Surrogate testing
  758.         surrogate_corrs = []
  759.         for _ in range(self.n_surrogates):
  760.             shuffled_amp2 = np.random.permutation(amp2)
  761.             corr_surr = np.corrcoef(amp1, shuffled_amp2)[0, 1]
  762.             surrogate_corrs.append(corr_surr)
  763.  
  764.         p_value = np.mean(np.array(surrogate_corrs) >= corr)
  765.         return corr, p_value
  766.  
  767.     def detect_artifacts(
  768.         self,
  769.         eeg_data: np.ndarray,
  770.         channel_name: str
  771.     ) -> Dict[str, List[Tuple[float, str, float]]]:
  772.         """
  773.        Detect potential artifacts that could affect PAC analysis.
  774.  
  775.        Args:
  776.            eeg_data (np.ndarray): EEG signal
  777.            channel_name (str): Name of the channel
  778.  
  779.        Returns:
  780.            Dict[str, List[Tuple[float, str, float]]]: Dictionary of artifact detections
  781.                with timestamps, types, and severity scores
  782.        """
  783.         artifacts = {
  784.             'muscle': [],
  785.             'movement': [],
  786.             'line_noise': [],
  787.             'saturation': []
  788.         }
  789.  
  790.         # Window parameters
  791.         win_len = int(0.5 * self.sfreq)  # 500ms windows
  792.         step = int(0.1 * self.sfreq)  # 100ms steps
  793.  
  794.         for start in range(0, len(eeg_data) - win_len, step):
  795.             window = eeg_data[start:start + win_len]
  796.             time = start / self.sfreq
  797.  
  798.             # Muscle artifact detection (high frequency power)
  799.             gamma_power = np.mean(np.abs(self.bandpass(window, (30, 100))))
  800.             if gamma_power > np.std(eeg_data) * 3:
  801.                 artifacts['muscle'].append((time, 'EMG', gamma_power))
  802.  
  803.             # Movement artifact detection (sudden amplitude changes)
  804.             amp_diff = np.diff(window)
  805.             if np.max(np.abs(amp_diff)) > np.std(eeg_data) * 4:
  806.                 artifacts['movement'].append((time, 'Movement', np.max(np.abs(amp_diff))))
  807.  
  808.             # Line noise detection (50/60 Hz)
  809.             fft = np.fft.fft(window)
  810.             freqs = np.fft.fftfreq(len(window), 1/self.sfreq)
  811.             line_freq_mask = (np.abs(freqs - 50) < 1) | (np.abs(freqs - 60) < 1)
  812.             line_power = np.mean(np.abs(fft[line_freq_mask]))
  813.             if line_power > np.mean(np.abs(fft)) * 2:
  814.                 artifacts['line_noise'].append((time, 'Line Noise', line_power))
  815.  
  816.             # Saturation detection
  817.             if np.max(np.abs(window)) > np.std(eeg_data) * 5:
  818.                 artifacts['saturation'].append((time, 'Saturation', np.max(np.abs(window))))
  819.  
  820.         return artifacts
  821.  
  822.     def validate_pac_computation(
  823.         self,
  824.         eeg_data: np.ndarray,
  825.         phase_band: str,
  826.         amplitude_band: str
  827.     ) -> Dict[str, Union[bool, str, float]]:
  828.         """
  829.        Validate PAC computation and check for potential issues.
  830.  
  831.        Args:
  832.            eeg_data (np.ndarray): EEG signal
  833.            phase_band (str): Band for phase
  834.            amplitude_band (str): Band for amplitude
  835.  
  836.        Returns:
  837.            Dict[str, Union[bool, str, float]]: Validation results
  838.        """
  839.         results = {
  840.             'is_valid': True,
  841.             'warnings': [],
  842.             'phase_continuity': 0.0,
  843.             'amplitude_stability': 0.0
  844.         }
  845.  
  846.         # Check signal length
  847.         if len(eeg_data) < 5 * self.sfreq:  # Minimum 5 seconds
  848.             results['is_valid'] = False
  849.             results['warnings'].append("Signal too short for reliable PAC computation")
  850.             return results
  851.  
  852.         # Phase signal validation
  853.         phase_signal = self.bandpass(eeg_data, self.bands[phase_band])
  854.         phase = np.angle(signal.hilbert(phase_signal))
  855.         phase_diff = np.diff(phase)
  856.         phase_diff = np.mod(phase_diff + np.pi, 2 * np.pi) - np.pi
  857.         phase_continuity = 1 - (np.std(phase_diff) / np.pi)
  858.         results['phase_continuity'] = phase_continuity
  859.  
  860.         if phase_continuity < 0.7:
  861.             results['warnings'].append("Poor phase continuity detected")
  862.  
  863.         # Amplitude signal validation
  864.         amp_signal = self.bandpass(eeg_data, self.bands[amplitude_band])
  865.         amplitude = np.abs(signal.hilbert(amp_signal))
  866.         amp_stability = 1 - (np.std(amplitude) / np.mean(amplitude))
  867.         results['amplitude_stability'] = amp_stability
  868.  
  869.         if amp_stability < 0.5:
  870.             results['warnings'].append("Unstable amplitude envelope detected")
  871.  
  872.         # Check for edge effects
  873.         edge_samples = int(0.1 * self.sfreq)  # 100ms
  874.         edge_var_ratio = (np.var(phase_signal[:edge_samples]) + np.var(phase_signal[-edge_samples:])) / \
  875.                         (2 * np.var(phase_signal[edge_samples:-edge_samples]))
  876.        
  877.         if edge_var_ratio > 2.0:
  878.             results['warnings'].append("Filter edge effects detected")
  879.  
  880.         return results
  881.  
  882.     def flag_sequences_enhanced(
  883.         self,
  884.         eeg_data: np.ndarray,
  885.         phase_band: str = 'theta',
  886.         amplitude_band: str = 'gamma',
  887.         channel_name: str = 'Unknown',
  888.         task_timestamps: Optional[List[Tuple[float, float]]] = None,
  889.         artifact_rejection: bool = True,
  890.         validation_check: bool = True
  891.     ) -> Dict[str, Any]:
  892.         """
  893.        Enhanced version of flag_sequences with additional detection vectors.
  894.  
  895.        Args:
  896.            eeg_data (np.ndarray): EEG signal
  897.            phase_band (str): Band for phase
  898.            amplitude_band (str): Band for amplitude
  899.            channel_name (str): Name of the channel
  900.            task_timestamps (Optional[List[Tuple[float, float]]]): Task segments
  901.            artifact_rejection (bool): Whether to perform artifact rejection
  902.            validation_check (bool): Whether to validate PAC computation
  903.  
  904.        Returns:
  905.            Dict[str, Any]: Enhanced flagging results including artifacts and validation
  906.        """
  907.         results = {
  908.             'flags': [],
  909.             'pac_scores': [],
  910.             'artifacts': None,
  911.             'validation': None,
  912.             'warnings': []
  913.         }
  914.  
  915.         # Artifact detection if enabled
  916.         if artifact_rejection:
  917.             artifacts = self.detect_artifacts(eeg_data, channel_name)
  918.             results['artifacts'] = artifacts
  919.            
  920.             # Create artifact-free signal
  921.             artifact_mask = np.ones(len(eeg_data), dtype=bool)
  922.             for artifact_type in artifacts.values():
  923.                 for t, _, _ in artifact_type:
  924.                     start_idx = int(t * self.sfreq)
  925.                     end_idx = start_idx + int(0.5 * self.sfreq)
  926.                     artifact_mask[start_idx:end_idx] = False
  927.            
  928.             eeg_data = eeg_data[artifact_mask]
  929.             if len(eeg_data) < 5 * self.sfreq:
  930.                 results['warnings'].append("Insufficient clean data after artifact rejection")
  931.                 return results
  932.  
  933.         # Validation check if enabled
  934.         if validation_check:
  935.             validation = self.validate_pac_computation(eeg_data, phase_band, amplitude_band)
  936.             results['validation'] = validation
  937.             if not validation['is_valid']:
  938.                 results['warnings'].extend(validation['warnings'])
  939.                 return results
  940.  
  941.         # Compute standard PAC flags
  942.         flags, pac_scores = self.flag_sequences(
  943.             eeg_data,
  944.             phase_band,
  945.             amplitude_band,
  946.             channel_name,
  947.             task_timestamps
  948.         )
  949.         results['flags'] = flags
  950.         results['pac_scores'] = pac_scores
  951.  
  952.         # Additional coupling measures
  953.         if len(eeg_data) >= 5 * self.sfreq:
  954.             # Phase-phase coupling between adjacent bands
  955.             phase_phase_results = {}
  956.             for band1, band2 in [('theta', 'alpha'), ('alpha', 'beta')]:
  957.                 plv, p_val = self.compute_phase_phase_coupling(
  958.                     eeg_data, eeg_data,
  959.                     self.bands[band1], self.bands[band2]
  960.                 )
  961.                 phase_phase_results[f"{band1}_{band2}"] = {'plv': plv, 'p_value': p_val}
  962.             results['phase_phase_coupling'] = phase_phase_results
  963.  
  964.             # Amplitude-amplitude coupling
  965.             amp_amp_results = {}
  966.             for band1, band2 in [('theta', 'gamma'), ('alpha', 'gamma')]:
  967.                 corr, p_val = self.compute_amplitude_amplitude_coupling(
  968.                     eeg_data, eeg_data,
  969.                     self.bands[band1], self.bands[band2]
  970.                 )
  971.                 amp_amp_results[f"{band1}_{band2}"] = {'correlation': corr, 'p_value': p_val}
  972.             results['amplitude_amplitude_coupling'] = amp_amp_results
  973.  
  974.         return results
  975.  
  976.     def compute_mvl_pac(
  977.         self,
  978.         phase_signal: np.ndarray,
  979.         amplitude_signal: np.ndarray
  980.     ) -> Tuple[float, float]:
  981.         """
  982.        Compute PAC using Mean Vector Length method (Canolty et al., 2006).
  983.  
  984.        Args:
  985.            phase_signal (np.ndarray): Phase signal
  986.            amplitude_signal (np.ndarray): Amplitude signal
  987.  
  988.        Returns:
  989.            Tuple[float, float]: MVL modulation index and p-value
  990.        """
  991.         # Compute complex-valued composite signal
  992.         z = amplitude_signal * np.exp(1j * phase_signal)
  993.         mvl = np.abs(np.mean(z))
  994.        
  995.         # Surrogate testing
  996.         surrogate_mvls = []
  997.         for _ in range(self.n_surrogates):
  998.             shuffled_phase = np.random.permutation(phase_signal)
  999.             z_surr = amplitude_signal * np.exp(1j * shuffled_phase)
  1000.             surrogate_mvls.append(np.abs(np.mean(z_surr)))
  1001.        
  1002.         p_value = np.mean(np.array(surrogate_mvls) >= mvl)
  1003.         return mvl, p_value
  1004.  
  1005.     def compute_glm_pac(
  1006.         self,
  1007.         phase_signal: np.ndarray,
  1008.         amplitude_signal: np.ndarray
  1009.     ) -> Tuple[float, float]:
  1010.         """
  1011.        Compute PAC using General Linear Model method (Penny et al., 2008).
  1012.  
  1013.        Args:
  1014.            phase_signal (np.ndarray): Phase signal
  1015.            amplitude_signal (np.ndarray): Amplitude signal
  1016.  
  1017.        Returns:
  1018.            Tuple[float, float]: GLM modulation index and p-value
  1019.        """
  1020.         # Prepare design matrix
  1021.         X = np.column_stack([
  1022.             np.cos(phase_signal),
  1023.             np.sin(phase_signal),
  1024.             np.ones_like(phase_signal)
  1025.         ])
  1026.        
  1027.         # Fit GLM
  1028.         beta = np.linalg.lstsq(X, amplitude_signal, rcond=None)[0]
  1029.         predicted = X @ beta
  1030.         residuals = amplitude_signal - predicted
  1031.         r_squared = 1 - np.var(residuals) / np.var(amplitude_signal)
  1032.        
  1033.         # Surrogate testing
  1034.         surrogate_r2s = []
  1035.         for _ in range(self.n_surrogates):
  1036.             shuffled_phase = np.random.permutation(phase_signal)
  1037.             X_surr = np.column_stack([
  1038.                 np.cos(shuffled_phase),
  1039.                 np.sin(shuffled_phase),
  1040.                 np.ones_like(shuffled_phase)
  1041.             ])
  1042.             beta_surr = np.linalg.lstsq(X_surr, amplitude_signal, rcond=None)[0]
  1043.             predicted_surr = X_surr @ beta_surr
  1044.             residuals_surr = amplitude_signal - predicted_surr
  1045.             r2_surr = 1 - np.var(residuals_surr) / np.var(amplitude_signal)
  1046.             surrogate_r2s.append(r2_surr)
  1047.        
  1048.         p_value = np.mean(np.array(surrogate_r2s) >= r_squared)
  1049.         return r_squared, p_value
  1050.  
  1051.     def compute_cluster_stats(
  1052.         self,
  1053.         coupling_scores: np.ndarray,
  1054.         threshold: float,
  1055.         n_permutations: int = 1000
  1056.     ) -> Dict[str, Any]:
  1057.         """
  1058.        Perform cluster-based permutation test for coupling scores.
  1059.  
  1060.        Args:
  1061.            coupling_scores (np.ndarray): Array of coupling scores
  1062.            threshold (float): Initial threshold for cluster formation
  1063.            n_permutations (int): Number of permutations for testing
  1064.  
  1065.        Returns:
  1066.            Dict[str, Any]: Dictionary containing cluster statistics
  1067.        """
  1068.         def find_clusters(data: np.ndarray, thresh: float) -> List[Tuple[int, int]]:
  1069.             """Find continuous clusters above threshold."""
  1070.             clusters = []
  1071.             current_cluster = None
  1072.            
  1073.             for i, value in enumerate(data):
  1074.                 if value > thresh:
  1075.                     if current_cluster is None:
  1076.                         current_cluster = [i]
  1077.                     else:
  1078.                         current_cluster.append(i)
  1079.                 elif current_cluster is not None:
  1080.                     clusters.append((current_cluster[0], current_cluster[-1]))
  1081.                     current_cluster = None
  1082.            
  1083.             if current_cluster is not None:
  1084.                 clusters.append((current_cluster[0], current_cluster[-1]))
  1085.            
  1086.             return clusters
  1087.  
  1088.         # Find observed clusters
  1089.         observed_clusters = find_clusters(coupling_scores, threshold)
  1090.         cluster_stats = {
  1091.             'clusters': [],
  1092.             'significant_clusters': [],
  1093.             'max_stat_dist': []
  1094.         }
  1095.  
  1096.         if not observed_clusters:
  1097.             return cluster_stats
  1098.  
  1099.         # Compute cluster statistics
  1100.         for start, end in observed_clusters:
  1101.             cluster_sum = np.sum(coupling_scores[start:end+1])
  1102.             cluster_stats['clusters'].append({
  1103.                 'start': start,
  1104.                 'end': end,
  1105.                 'sum_stat': cluster_sum
  1106.             })
  1107.  
  1108.         # Permutation testing
  1109.         max_cluster_sums = []
  1110.         for _ in range(n_permutations):
  1111.             perm_scores = np.random.permutation(coupling_scores)
  1112.             perm_clusters = find_clusters(perm_scores, threshold)
  1113.             if perm_clusters:
  1114.                 max_sum = max(
  1115.                     np.sum(perm_scores[start:end+1])
  1116.                     for start, end in perm_clusters
  1117.                 )
  1118.                 max_cluster_sums.append(max_sum)
  1119.             else:
  1120.                 max_cluster_sums.append(0)
  1121.  
  1122.         # Determine significance
  1123.         max_cluster_sums = np.array(max_cluster_sums)
  1124.         for cluster in cluster_stats['clusters']:
  1125.             p_value = np.mean(max_cluster_sums >= cluster['sum_stat'])
  1126.             if p_value < self.p_value:
  1127.                 cluster_stats['significant_clusters'].append({
  1128.                     'start': cluster['start'],
  1129.                     'end': cluster['end'],
  1130.                     'sum_stat': cluster['sum_stat'],
  1131.                     'p_value': p_value
  1132.                 })
  1133.  
  1134.         cluster_stats['max_stat_dist'] = max_cluster_sums.tolist()
  1135.         return cluster_stats
  1136.  
  1137.     def compute_advanced_coupling_stats(
  1138.         self,
  1139.         eeg_data: np.ndarray,
  1140.         channel_name: str,
  1141.         phase_band: str = 'theta',
  1142.         amplitude_band: str = 'gamma'
  1143.     ) -> Dict[str, Any]:
  1144.         """
  1145.        Compute comprehensive coupling statistics using multiple methods.
  1146.  
  1147.        Args:
  1148.            eeg_data (np.ndarray): EEG signal
  1149.            channel_name (str): Channel name
  1150.            phase_band (str): Phase frequency band
  1151.            amplitude_band (str): Amplitude frequency band
  1152.  
  1153.        Returns:
  1154.            Dict[str, Any]: Dictionary containing all coupling metrics
  1155.        """
  1156.         # Filter signals
  1157.         phase_signal = self.bandpass(eeg_data, self.bands[phase_band])
  1158.         amplitude_signal = self.bandpass(eeg_data, self.bands[amplitude_band])
  1159.        
  1160.         # Extract phase and amplitude
  1161.         phase = np.angle(signal.hilbert(phase_signal))
  1162.         amplitude = np.abs(signal.hilbert(amplitude_signal))
  1163.        
  1164.         # Compute coupling using different methods
  1165.         klmi, klmi_p = self.compute_phase_amplitude_coupling(phase, amplitude)
  1166.         mvl, mvl_p = self.compute_mvl_pac(phase, amplitude)
  1167.         glm, glm_p = self.compute_glm_pac(phase, amplitude)
  1168.        
  1169.         # Combine scores for cluster analysis
  1170.         coupling_scores = np.array([klmi, mvl, glm])
  1171.         cluster_stats = self.compute_cluster_stats(
  1172.             coupling_scores,
  1173.             threshold=self.threshold
  1174.         )
  1175.        
  1176.         return {
  1177.             'channel': channel_name,
  1178.             'phase_band': phase_band,
  1179.             'amplitude_band': amplitude_band,
  1180.             'methods': {
  1181.                 'klmi': {'score': klmi, 'p_value': klmi_p},
  1182.                 'mvl': {'score': mvl, 'p_value': mvl_p},
  1183.                 'glm': {'score': glm, 'p_value': glm_p}
  1184.             },
  1185.             'cluster_stats': cluster_stats,
  1186.             'consensus_score': np.mean([klmi, mvl, glm]),
  1187.             'method_agreement': np.std([klmi, mvl, glm])
  1188.         }
  1189.  
  1190.     def flag_sequences_advanced(
  1191.         self,
  1192.         eeg_data: np.ndarray,
  1193.         phase_band: str = 'theta',
  1194.         amplitude_band: str = 'gamma',
  1195.         channel_name: str = 'Unknown',
  1196.         task_timestamps: Optional[List[Tuple[float, float]]] = None,
  1197.         min_agreement: float = 0.5
  1198.     ) -> Dict[str, Any]:
  1199.         """
  1200.        Advanced flagging using multiple coupling methods and statistical validation.
  1201.  
  1202.        Args:
  1203.            eeg_data (np.ndarray): EEG signal
  1204.            phase_band (str): Phase frequency band
  1205.            amplitude_band (str): Amplitude frequency band
  1206.            channel_name (str): Channel name
  1207.            task_timestamps (Optional[List[Tuple[float, float]]]): Task segments
  1208.            min_agreement (float): Minimum agreement between methods
  1209.  
  1210.        Returns:
  1211.            Dict[str, Any]: Dictionary containing flagged windows and statistics
  1212.        """
  1213.         win_len = int(2 * self.sfreq)
  1214.         step = int(0.5 * self.sfreq)
  1215.         results = {
  1216.             'flags': [],
  1217.             'method_scores': [],
  1218.             'cluster_stats': [],
  1219.             'consensus_flags': []
  1220.         }
  1221.        
  1222.         if task_timestamps:
  1223.             for start_t, end_t in task_timestamps:
  1224.                 start = int(start_t * self.sfreq)
  1225.                 end = min(int(end_t * self.sfreq), len(eeg_data))
  1226.                 for win_start in range(start, end - win_len, step):
  1227.                     win_end = win_start + win_len
  1228.                     window_stats = self.compute_advanced_coupling_stats(
  1229.                         eeg_data[win_start:win_end],
  1230.                         channel_name,
  1231.                         phase_band,
  1232.                         amplitude_band
  1233.                     )
  1234.                    
  1235.                     # Check if methods agree on significance
  1236.                     significant_methods = sum(
  1237.                         1 for method in window_stats['methods'].values()
  1238.                         if method['score'] > self.threshold and method['p_value'] < self.p_value
  1239.                     )
  1240.                     method_agreement = significant_methods / len(window_stats['methods'])
  1241.                    
  1242.                     if method_agreement >= min_agreement:
  1243.                         results['consensus_flags'].append((
  1244.                             win_start / self.sfreq,
  1245.                             win_end / self.sfreq,
  1246.                             window_stats['consensus_score'],
  1247.                             window_stats['method_agreement']
  1248.                         ))
  1249.                    
  1250.                     results['method_scores'].append(window_stats['methods'])
  1251.                     results['cluster_stats'].append(window_stats['cluster_stats'])
  1252.         else:
  1253.             for start in range(0, len(eeg_data) - win_len, step):
  1254.                 end = start + win_len
  1255.                 window_stats = self.compute_advanced_coupling_stats(
  1256.                     eeg_data[start:end],
  1257.                     channel_name,
  1258.                     phase_band,
  1259.                     amplitude_band
  1260.                 )
  1261.                
  1262.                 significant_methods = sum(
  1263.                     1 for method in window_stats['methods'].values()
  1264.                     if method['score'] > self.threshold and method['p_value'] < self.p_value
  1265.                 )
  1266.                 method_agreement = significant_methods / len(window_stats['methods'])
  1267.                
  1268.                 if method_agreement >= min_agreement:
  1269.                     results['consensus_flags'].append((
  1270.                         start / self.sfreq,
  1271.                         end / self.sfreq,
  1272.                         window_stats['consensus_score'],
  1273.                         window_stats['method_agreement']
  1274.                     ))
  1275.                
  1276.                 results['method_scores'].append(window_stats['methods'])
  1277.                 results['cluster_stats'].append(window_stats['cluster_stats'])
  1278.        
  1279.         logger.info(
  1280.             f"Flagged {len(results['consensus_flags'])} consensus windows "
  1281.             f"for {channel_name} ({phase_band}-{amplitude_band})."
  1282.         )
  1283.         return results
  1284.  
  1285.     def detect_artifacts_ml(
  1286.         self,
  1287.         eeg_data: np.ndarray,
  1288.         channel_name: str,
  1289.         classifier_type: str = 'svm',
  1290.         feature_set: str = 'standard'
  1291.     ) -> Dict[str, List[Tuple[float, str, float, float]]]:
  1292.         """
  1293.        Enhanced artifact detection using machine learning classifiers.
  1294.  
  1295.        Args:
  1296.            eeg_data (np.ndarray): EEG signal
  1297.            channel_name (str): Channel name
  1298.            classifier_type (str): Type of classifier ('svm', 'rf', 'nn')
  1299.            feature_set (str): Feature set to use ('standard', 'extended')
  1300.  
  1301.        Returns:
  1302.            Dict[str, List[Tuple[float, str, float, float]]]:
  1303.                Dictionary of artifact detections with timestamps, types,
  1304.                severity scores, and confidence scores
  1305.        """
  1306.         artifacts = {
  1307.             'muscle': [],
  1308.             'movement': [],
  1309.             'line_noise': [],
  1310.             'saturation': [],
  1311.             'blink': [],
  1312.             'saccade': []
  1313.         }
  1314.  
  1315.         # Window parameters
  1316.         win_len = int(0.5 * self.sfreq)  # 500ms windows
  1317.         step = int(0.1 * self.sfreq)  # 100ms steps
  1318.  
  1319.         def extract_features(window: np.ndarray) -> np.ndarray:
  1320.             """Extract features for artifact detection."""
  1321.             features = []
  1322.            
  1323.             # Time domain features
  1324.             features.extend([
  1325.                 np.mean(window),
  1326.                 np.std(window),
  1327.                 np.max(np.abs(window)),
  1328.                 np.mean(np.abs(np.diff(window))),
  1329.                 scipy.stats.skew(window),
  1330.                 scipy.stats.kurtosis(window)
  1331.             ])
  1332.            
  1333.             # Frequency domain features
  1334.             freqs, psd = signal.welch(window, self.sfreq, nperseg=min(256, len(window)))
  1335.             total_power = np.sum(psd)
  1336.             delta_power = np.sum(psd[(freqs >= 1) & (freqs <= 4)]) / total_power
  1337.             theta_power = np.sum(psd[(freqs >= 4) & (freqs <= 8)]) / total_power
  1338.             alpha_power = np.sum(psd[(freqs >= 8) & (freqs <= 13)]) / total_power
  1339.             beta_power = np.sum(psd[(freqs >= 13) & (freqs <= 30)]) / total_power
  1340.             gamma_power = np.sum(psd[freqs >= 30]) / total_power
  1341.             line_power = np.sum(psd[(freqs >= 49) & (freqs <= 51)]) / total_power
  1342.            
  1343.             features.extend([
  1344.                 delta_power, theta_power, alpha_power,
  1345.                 beta_power, gamma_power, line_power
  1346.             ])
  1347.            
  1348.             if feature_set == 'extended':
  1349.                 # Wavelet features
  1350.                 coeffs = pywt.wavedec(window, 'db4', level=4)
  1351.                 features.extend([np.std(c) for c in coeffs])
  1352.                
  1353.                 # Hjorth parameters
  1354.                 activity = np.var(window)
  1355.                 mobility = np.sqrt(np.var(np.diff(window)) / activity)
  1356.                 complexity = np.sqrt(np.var(np.diff(np.diff(window))) / np.var(np.diff(window))) / mobility
  1357.                 features.extend([activity, mobility, complexity])
  1358.            
  1359.             return np.array(features)
  1360.  
  1361.         def get_classifier():
  1362.             """Initialize the selected classifier."""
  1363.             if classifier_type == 'svm':
  1364.                 return sklearn.svm.SVC(probability=True)
  1365.             elif classifier_type == 'rf':
  1366.                 return sklearn.ensemble.RandomForestClassifier()
  1367.             elif classifier_type == 'nn':
  1368.                 return sklearn.neural_network.MLPClassifier()
  1369.             else:
  1370.                 raise ValueError(f"Unsupported classifier type: {classifier_type}")
  1371.  
  1372.         # Initialize classifier (in practice, this would be pre-trained)
  1373.         classifier = get_classifier()
  1374.  
  1375.         for start in range(0, len(eeg_data) - win_len, step):
  1376.             window = eeg_data[start:start + win_len]
  1377.             time = start / self.sfreq
  1378.            
  1379.             # Extract features
  1380.             features = extract_features(window)
  1381.            
  1382.             # Get classifier predictions and probabilities
  1383.             artifact_type = classifier.predict([features])[0]
  1384.             probabilities = classifier.predict_proba([features])[0]
  1385.             confidence = np.max(probabilities)
  1386.            
  1387.             # Compute severity score based on feature values
  1388.             severity = np.mean([
  1389.                 np.abs(features[0]) / np.std(eeg_data),  # Normalized amplitude
  1390.                 features[5],  # Gamma power ratio
  1391.                 features[4]   # Line noise ratio
  1392.             ])
  1393.            
  1394.             if confidence > 0.8:  # High confidence threshold
  1395.                 artifacts[artifact_type].append((time, artifact_type, severity, confidence))
  1396.  
  1397.         return artifacts
  1398.  
  1399.     def plot_advanced_coupling(
  1400.         self,
  1401.         eeg_data: np.ndarray,
  1402.         channel_name: str,
  1403.         output_dir: Union[str, Path],
  1404.         phase_band: str = 'theta',
  1405.         amplitude_band: str = 'gamma',
  1406.         plot_type: str = 'all'
  1407.     ) -> Dict[str, str]:
  1408.         """
  1409.        Generate advanced coupling visualizations.
  1410.  
  1411.        Args:
  1412.            eeg_data (np.ndarray): EEG signal
  1413.            channel_name (str): Channel name
  1414.            output_dir (Union[str, Path]): Output directory
  1415.            phase_band (str): Phase frequency band
  1416.            amplitude_band (str): Amplitude frequency band
  1417.            plot_type (str): Type of plots to generate ('all', 'pac', 'ppc', 'aac')
  1418.  
  1419.        Returns:
  1420.            Dict[str, str]: Dictionary of plot paths
  1421.        """
  1422.         output_dir = Path(output_dir)
  1423.         output_dir.mkdir(parents=True, exist_ok=True)
  1424.         plot_paths = {}
  1425.  
  1426.         if plot_type in ['all', 'pac']:
  1427.             # Enhanced PAC visualization
  1428.             phase_signal = self.bandpass(eeg_data, self.bands[phase_band])
  1429.             amplitude_signal = self.bandpass(eeg_data, self.bands[amplitude_band])
  1430.             phase = np.angle(signal.hilbert(phase_signal))
  1431.             amplitude = np.abs(signal.hilbert(amplitude_signal))
  1432.  
  1433.             plt.figure(figsize=(15, 10))
  1434.            
  1435.             # Main plot: Phase-amplitude distribution
  1436.             plt.subplot(2, 2, 1)
  1437.             n_bins = 18
  1438.             phase_bins = np.linspace(-np.pi, np.pi, n_bins + 1)
  1439.             mean_amp = np.zeros(n_bins)
  1440.             std_amp = np.zeros(n_bins)
  1441.            
  1442.             for i in range(n_bins):
  1443.                 idx = np.where((phase >= phase_bins[i]) & (phase < phase_bins[i + 1]))[0]
  1444.                 if len(idx) > 0:
  1445.                     mean_amp[i] = np.mean(amplitude[idx])
  1446.                     std_amp[i] = np.std(amplitude[idx])
  1447.            
  1448.             plt.errorbar(phase_bins[:-1], mean_amp, yerr=std_amp, fmt='o-', capsize=5)
  1449.             plt.xlabel(f'{phase_band.capitalize()} Phase (rad)')
  1450.             plt.ylabel(f'{amplitude_band.capitalize()} Amplitude')
  1451.             plt.title('Phase-Amplitude Distribution')
  1452.            
  1453.             # Subplot: Time-frequency representation
  1454.             plt.subplot(2, 2, 2)
  1455.             f, t, Sxx = signal.spectrogram(eeg_data, fs=self.sfreq, nperseg=256)
  1456.             plt.pcolormesh(t, f, 10 * np.log10(Sxx))
  1457.             plt.ylabel('Frequency (Hz)')
  1458.             plt.xlabel('Time (s)')
  1459.             plt.title('Time-Frequency Representation')
  1460.            
  1461.             # Subplot: Phase histogram
  1462.             plt.subplot(2, 2, 3)
  1463.             plt.hist(phase, bins=36, density=True)
  1464.             plt.xlabel('Phase (rad)')
  1465.             plt.ylabel('Density')
  1466.             plt.title(f'{phase_band.capitalize()} Phase Distribution')
  1467.            
  1468.             # Subplot: Amplitude histogram
  1469.             plt.subplot(2, 2, 4)
  1470.             plt.hist(amplitude, bins=50, density=True)
  1471.             plt.xlabel('Amplitude')
  1472.             plt.ylabel('Density')
  1473.             plt.title(f'{amplitude_band.capitalize()} Amplitude Distribution')
  1474.            
  1475.             plt.tight_layout()
  1476.             plot_path = output_dir / f"advanced_pac_{channel_name}_{phase_band}_{amplitude_band}.png"
  1477.             plt.savefig(plot_path, dpi=300)
  1478.             plt.close()
  1479.             plot_paths['pac'] = str(plot_path)
  1480.  
  1481.         if plot_type in ['all', 'ppc']:
  1482.             # Phase-Phase Coupling visualization
  1483.             plt.figure(figsize=(10, 8))
  1484.             freqs = np.array(list(self.bands.values()))
  1485.             n_freqs = len(freqs)
  1486.             ppc_matrix = np.zeros((n_freqs, n_freqs))
  1487.            
  1488.             for i, (band1_name, band1) in enumerate(self.bands.items()):
  1489.                 for j, (band2_name, band2) in enumerate(self.bands.items()):
  1490.                     if i < j:  # Upper triangle only
  1491.                         plv, _ = self.compute_phase_phase_coupling(
  1492.                             eeg_data, eeg_data, band1, band2
  1493.                         )
  1494.                         ppc_matrix[i, j] = plv
  1495.            
  1496.             plt.imshow(ppc_matrix, cmap='viridis', aspect='auto')
  1497.             plt.colorbar(label='Phase Locking Value')
  1498.             plt.xticks(range(n_freqs), self.bands.keys())
  1499.             plt.yticks(range(n_freqs), self.bands.keys())
  1500.             plt.title(f'Phase-Phase Coupling Matrix ({channel_name})')
  1501.            
  1502.             plot_path = output_dir / f"ppc_matrix_{channel_name}.png"
  1503.             plt.savefig(plot_path, dpi=300)
  1504.             plt.close()
  1505.             plot_paths['ppc'] = str(plot_path)
  1506.  
  1507.         if plot_type in ['all', 'aac']:
  1508.             # Amplitude-Amplitude Coupling visualization
  1509.             plt.figure(figsize=(10, 8))
  1510.             aac_matrix = np.zeros((n_freqs, n_freqs))
  1511.            
  1512.             for i, (band1_name, band1) in enumerate(self.bands.items()):
  1513.                 for j, (band2_name, band2) in enumerate(self.bands.items()):
  1514.                     if i < j:  # Upper triangle only
  1515.                         corr, _ = self.compute_amplitude_amplitude_coupling(
  1516.                             eeg_data, eeg_data, band1, band2
  1517.                         )
  1518.                         aac_matrix[i, j] = corr
  1519.            
  1520.             plt.imshow(aac_matrix, cmap='RdBu_r', aspect='auto', vmin=-1, vmax=1)
  1521.             plt.colorbar(label='Correlation Coefficient')
  1522.             plt.xticks(range(n_freqs), self.bands.keys())
  1523.             plt.yticks(range(n_freqs), self.bands.keys())
  1524.             plt.title(f'Amplitude-Amplitude Coupling Matrix ({channel_name})')
  1525.            
  1526.             plot_path = output_dir / f"aac_matrix_{channel_name}.png"
  1527.             plt.savefig(plot_path, dpi=300)
  1528.             plt.close()
  1529.             plot_paths['aac'] = str(plot_path)
  1530.  
  1531.         return plot_paths
  1532.  
  1533.     def generate_comprehensive_report(
  1534.         self,
  1535.         eeg_data: np.ndarray,
  1536.         channel_name: str,
  1537.         output_dir: Union[str, Path],
  1538.         phase_band: str = 'theta',
  1539.         amplitude_band: str = 'gamma'
  1540.     ) -> str:
  1541.         """
  1542.        Generate a comprehensive analysis report including artifacts and coupling.
  1543.  
  1544.        Args:
  1545.            eeg_data (np.ndarray): EEG signal
  1546.            channel_name (str): Channel name
  1547.            output_dir (Union[str, Path]): Output directory
  1548.            phase_band (str): Phase frequency band
  1549.            amplitude_band (str): Amplitude frequency band
  1550.  
  1551.        Returns:
  1552.            str: Path to the generated HTML report
  1553.        """
  1554.         output_dir = Path(output_dir)
  1555.         output_dir.mkdir(parents=True, exist_ok=True)
  1556.  
  1557.         # Run all analyses
  1558.         artifacts = self.detect_artifacts_ml(eeg_data, channel_name)
  1559.         coupling_stats = self.compute_advanced_coupling_stats(
  1560.             eeg_data, channel_name, phase_band, amplitude_band
  1561.         )
  1562.         plot_paths = self.plot_advanced_coupling(
  1563.             eeg_data, channel_name, output_dir, phase_band, amplitude_band
  1564.         )
  1565.  
  1566.         # Generate HTML report
  1567.         html_lines = []
  1568.         html_lines.append("<html><body>")
  1569.         html_lines.append(f"<h1>Comprehensive Analysis Report: {channel_name}</h1>")
  1570.        
  1571.         # Artifact section
  1572.         html_lines.append("<h2>Artifact Detection</h2>")
  1573.         html_lines.append("<table border='1'>")
  1574.         html_lines.append("<tr><th>Time (s)</th><th>Type</th><th>Severity</th><th>Confidence</th></tr>")
  1575.         for artifact_type, detections in artifacts.items():
  1576.             for time, type_name, severity, confidence in detections:
  1577.                 html_lines.append(
  1578.                     f"<tr><td>{time:.2f}</td><td>{type_name}</td>"
  1579.                     f"<td>{severity:.2f}</td><td>{confidence:.2f}</td></tr>"
  1580.                 )
  1581.         html_lines.append("</table>")
  1582.        
  1583.         # Coupling analysis section
  1584.         html_lines.append("<h2>Coupling Analysis</h2>")
  1585.         html_lines.append("<h3>Method Comparison</h3>")
  1586.         html_lines.append("<table border='1'>")
  1587.         html_lines.append("<tr><th>Method</th><th>Score</th><th>P-value</th></tr>")
  1588.         for method, results in coupling_stats['methods'].items():
  1589.             html_lines.append(
  1590.                 f"<tr><td>{method.upper()}</td>"
  1591.                 f"<td>{results['score']:.3f}</td>"
  1592.                 f"<td>{results['p_value']:.3f}</td></tr>"
  1593.             )
  1594.         html_lines.append("</table>")
  1595.        
  1596.         # Consensus metrics
  1597.         html_lines.append("<h3>Consensus Metrics</h3>")
  1598.         html_lines.append(f"<p>Consensus Score: {coupling_stats['consensus_score']:.3f}</p>")
  1599.         html_lines.append(f"<p>Method Agreement: {coupling_stats['method_agreement']:.3f}</p>")
  1600.        
  1601.         # Visualizations
  1602.         html_lines.append("<h2>Visualizations</h2>")
  1603.         for plot_type, path in plot_paths.items():
  1604.             html_lines.append(f"<h3>{plot_type.upper()} Analysis</h3>")
  1605.             html_lines.append(f"<img src='{path}' style='max-width:800px'>")
  1606.        
  1607.         html_lines.append("</body></html>")
  1608.        
  1609.         # Save report
  1610.         report_path = output_dir / f"comprehensive_report_{channel_name}.html"
  1611.         with open(report_path, 'w') as f:
  1612.             f.write("\n".join(html_lines))
  1613.        
  1614.         return str(report_path)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement