El_Chaderino

EDF to EEG converter

Aug 22nd, 2025 (edited)
186
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 22.25 KB | None | 0 0
  1. #!/usr/bin/env python3
  2. # EDF2EEG_converter.py
  3. # EDF → BrainVision (.vhdr/.vmrk) + cross-check report (header-level always; signal-level if mne available)
  4.  
  5. import argparse, os, struct, json, datetime, re
  6. from pathlib import Path
  7. from configparser import ConfigParser
  8.  
  9. # ------------------------
  10. # Utilities
  11. # ------------------------
  12. def micround(fs: float) -> int:
  13.     """SamplingInterval in microseconds (rounded int)."""
  14.     return int(round(1_000_000.0 / float(fs)))
  15.  
  16. def now_iso():
  17.     return datetime.datetime.now().isoformat(timespec="seconds")
  18.  
  19. def write_text(p: Path, s: str):
  20.     p.write_text(s, encoding="utf-8")
  21.  
  22. def pretty_bool(b: bool) -> str:
  23.     return "PASS" if b else "FAIL"
  24.  
  25. # ------------------------
  26. # Minimal EDF header reader (no 3rd party deps)
  27. # ------------------------
  28. class EDFHeader:
  29.     def __init__(self, path: Path):
  30.         self.path = path
  31.         self.n_signals = None
  32.         self.n_records = None
  33.         self.duration_record_s = None
  34.         self.labels = []
  35.         self.transducer = []
  36.         self.phys_dim = []
  37.         self.phys_min = []
  38.         self.phys_max = []
  39.         self.dig_min = []
  40.         self.dig_max = []
  41.         self.prefilt = []
  42.         self.samples_per_record = []
  43.         self.header_bytes = None
  44.         self.valid = False
  45.  
  46.     def read(self):
  47.         with self.path.open("rb") as f:
  48.             head0 = f.read(256)
  49.             if len(head0) < 256:
  50.                 raise ValueError("EDF header too short.")
  51.  
  52.             # Bytes 252-256: Number of signals (ns)
  53.             self.n_signals = int(head0[252:256].decode("ascii", "ignore").strip() or "0")
  54.             if self.n_signals <= 0 or self.n_signals > 512:
  55.                 raise ValueError(f"EDF ns invalid: {self.n_signals}")
  56.  
  57.             # Next sections are 256 bytes * ns, each field concatenated across channels
  58.             def read_field(field_len):
  59.                 return f.read(field_len * self.n_signals)
  60.  
  61.             labels_raw      = read_field(16)
  62.             transducer_raw  = read_field(80)
  63.             phys_dim_raw    = read_field(8)
  64.             phys_min_raw    = read_field(8)
  65.             phys_max_raw    = read_field(8)
  66.             dig_min_raw     = read_field(8)
  67.             dig_max_raw     = read_field(8)
  68.             prefilt_raw     = read_field(80)
  69.             spr_raw         = read_field(8)
  70.  
  71.             # Back to head0 to get number of records and duration
  72.             n_records      = int(head0[236:244].decode("ascii","ignore").strip() or "-1")
  73.             dur_per_record = float(head0[244:252].decode("ascii","ignore").strip() or "0")
  74.             self.n_records = n_records
  75.             self.duration_record_s = dur_per_record
  76.  
  77.             # Helper to split per-signal strings
  78.             def split_arr(raw, step, cast=str, strip=True):
  79.                 arr = []
  80.                 for i in range(self.n_signals):
  81.                     chunk = raw[i*step:(i+1)*step].decode("ascii", "ignore")
  82.                     if strip: chunk = chunk.strip()
  83.                     if cast is float:
  84.                         try: arr.append(float(chunk))
  85.                         except: arr.append(float("nan"))
  86.                     elif cast is int:
  87.                         try: arr.append(int(chunk))
  88.                         except: arr.append(0)
  89.                     else:
  90.                         arr.append(chunk)
  91.                 return arr
  92.  
  93.             self.labels      = split_arr(labels_raw, 16, str)
  94.             self.transducer  = split_arr(transducer_raw, 80, str)
  95.             self.phys_dim    = split_arr(phys_dim_raw, 8, str)
  96.             self.phys_min    = split_arr(phys_min_raw, 8, float)
  97.             self.phys_max    = split_arr(phys_max_raw, 8, float)
  98.             self.dig_min     = split_arr(dig_min_raw, 8, int)
  99.             self.dig_max     = split_arr(dig_max_raw, 8, int)
  100.             self.prefilt     = split_arr(prefilt_raw, 80, str)
  101.             self.samples_per_record = split_arr(spr_raw, 8, int)
  102.  
  103.             # Compute fs per channel
  104.             self.fs_per_channel = []
  105.             for spr in self.samples_per_record:
  106.                 fs = spr / self.duration_record_s if self.duration_record_s > 0 else 0.0
  107.                 self.fs_per_channel.append(fs)
  108.  
  109.             self.header_bytes = 256 + self.n_signals * 256
  110.             self.valid = True
  111.             return self
  112.  
  113.     def common_fs(self):
  114.         # returns (fs, ok_equal) where ok_equal=True if all channels agree
  115.         if not self.valid: return (None, False)
  116.         uniq = {round(fs,6) for fs in self.fs_per_channel}
  117.         if len(uniq) == 1:
  118.             return (float(next(iter(uniq))), True)
  119.         # if multiple, take mode
  120.         from collections import Counter
  121.         c = Counter(self.fs_per_channel)
  122.         fs = max(c.items(), key=lambda kv: kv[1])[0]
  123.         return (float(fs), False)
  124.  
  125.     def total_samples_per_channel(self):
  126.         # total = n_records * samples_per_record[ch]
  127.         if not self.valid: return None
  128.         return [self.n_records * spr for spr in self.samples_per_record]
  129.  
  130. # ------------------------
  131. # BrainVision parsers
  132. # ------------------------
  133. def parse_vhdr(path: Path):
  134.     cfg = ConfigParser()
  135.     # preserve case & allow no-value commas
  136.     cfg.optionxform = str
  137.     text = path.read_text(encoding="utf-8", errors="ignore")
  138.     # Inject section headers if malformed? (not needed usually)
  139.     cfg.read_string(text)
  140.     out = {"ok": True, "errors": [], "path": str(path)}
  141.     try:
  142.         ci = cfg["Common Infos"]
  143.         bi = cfg["Binary Infos"]
  144.         ch = cfg["Channel Infos"]
  145.  
  146.         out["DataFile"] = ci.get("DataFile", "").strip()
  147.         out["MarkerFile"] = ci.get("MarkerFile","").strip()
  148.         out["DataFormat"] = ci.get("DataFormat","").strip()
  149.         out["DataOrientation"] = ci.get("DataOrientation","").strip()
  150.         out["NumberOfChannels"] = int(ci.get("NumberOfChannels","0").strip() or "0")
  151.         out["SamplingInterval_us"] = float(ci.get("SamplingInterval","0").strip() or "0")
  152.         out["UseBigEndianOrder"] = ci.get("UseBigEndianOrder","NO").strip().upper() == "YES"
  153.  
  154.         out["BinaryFormat"] = bi.get("BinaryFormat","").strip()
  155.  
  156.         # Channel lines: Ch1=Fp1,,0.195,uV
  157.         chans = []
  158.         for k,v in cfg.items("Channel Infos"):
  159.             if not k.lower().startswith("ch"): continue
  160.             parts = [p.strip() for p in v.split(",")]
  161.             label = parts[0] if len(parts)>0 else ""
  162.             ref   = parts[1] if len(parts)>1 else ""
  163.             res   = parts[2] if len(parts)>2 else ""
  164.             unit  = parts[3] if len(parts)>3 else ""
  165.             try:
  166.                 res_val = float(res)
  167.             except:
  168.                 res_val = None
  169.             chans.append({"label": label, "ref": ref, "resolution_uV_per_bit": res_val, "unit": unit})
  170.         out["channels"] = chans
  171.     except Exception as e:
  172.         out["ok"] = False
  173.         out["errors"].append(f"VHDR parse error: {e}")
  174.     return out
  175.  
  176. def parse_vmrk(path: Path):
  177.     out = {"ok": True, "errors": [], "path": str(path), "markers": [], "DataFile": None}
  178.     try:
  179.         lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
  180.         in_marker = False
  181.         for ln in lines:
  182.             if ln.strip().startswith("[Common Infos]"):
  183.                 in_marker = False
  184.             if ln.strip().startswith("[Marker Infos]"):
  185.                 in_marker = True
  186.                 continue
  187.             if not in_marker:
  188.                 if ln.startswith("DataFile="):
  189.                     out["DataFile"] = ln.split("=",1)[1].strip()
  190.                 continue
  191.             if ln.startswith("Mk"):
  192.                 # Mk1=Type,Desc,Latency,Channel,Duration
  193.                 _, rhs = ln.split("=",1)
  194.                 parts = [p.strip() for p in rhs.split(",")]
  195.                 if len(parts) >= 5:
  196.                     out["markers"].append({
  197.                         "type": parts[0], "desc": parts[1],
  198.                         "latency_samples": int(re.sub(r"[^\d]", "", parts[2]) or "0"),
  199.                         "channel": int(parts[3]),
  200.                         "duration_samples": int(re.sub(r"[^\d]", "", parts[4]) or "0")
  201.                     })
  202.     except Exception as e:
  203.         out["ok"] = False
  204.         out["errors"].append(f"VMRK parse error: {e}")
  205.     return out
  206.  
  207. # ------------------------
  208. # Raw .eeg binary sanity
  209. # ------------------------
  210. def expected_eeg_bytes(n_ch: int, total_samples_per_ch: int, binary_format: str) -> int:
  211.     bps = 2 if binary_format.upper() == "INT_16" else 4 if "32" in binary_format else None
  212.     if bps is None:
  213.         return -1
  214.     return n_ch * total_samples_per_ch * bps
  215.  
  216. # ------------------------
  217. # Writer: BrainVision files
  218. # ------------------------
  219. def synthesize_vhdr(base: Path, data_filename: str, marker_filename: str,
  220.                     labels, fs: float, resolution_uV=0.195, binary_format="INT_16"):
  221.     N = len(labels)
  222.     vhdr = []
  223.     vhdr.append("Brain Vision Data Exchange Header File Version 1.0\n")
  224.     vhdr.append("[Common Infos]\n")
  225.     vhdr.append("Codepage=ANSI\n")
  226.     vhdr.append(f"DataFile={data_filename}\n")
  227.     vhdr.append(f"MarkerFile={marker_filename}\n")
  228.     vhdr.append("DataFormat=BINARY\n")
  229.     vhdr.append("DataOrientation=MULTIPLEXED\n")
  230.     vhdr.append(f"NumberOfChannels={N}\n")
  231.     vhdr.append(f"SamplingInterval={micround(fs)}\n")
  232.     vhdr.append("UseBigEndianOrder=NO\n\n")
  233.     vhdr.append("[Binary Infos]\n")
  234.     vhdr.append(f"BinaryFormat={binary_format}\n\n")
  235.     vhdr.append("[Channel Infos]\n")
  236.     for i, lab in enumerate(labels, start=1):
  237.         vhdr.append(f"Ch{i}={lab},,{resolution_uV},uV\n")
  238.     write_text(base.with_suffix(".vhdr"), "".join(vhdr))
  239.  
  240. def synthesize_vmrk(base: Path, data_filename: str, total_samples: int, extra_markers=None):
  241.     vmrk = []
  242.     vmrk.append("Brain Vision Data Exchange Marker File, Version 1.0\n\n")
  243.     vmrk.append("[Common Infos]\n")
  244.     vmrk.append("Codepage=ANSI\n")
  245.     vmrk.append(f"DataFile={data_filename}\n\n")
  246.     vmrk.append("[Marker Infos]\n")
  247.     vmrk.append("Mk1=New Segment,,1,0,0\n")
  248.     vmrk.append("Mk2=Recording Start,,1,0,0\n")
  249.     idx = 3
  250.     if extra_markers:
  251.         for m in extra_markers:
  252.             vmrk.append(f"Mk{idx}={m['type']},{m['desc']},{m['latency_samples']},{m.get('channel',0)},{m.get('duration_samples',0)}\n")
  253.             idx += 1
  254.     vmrk.append(f"Mk999=Recording End,,{total_samples},0,0\n")
  255.     write_text(base.with_suffix(".vmrk"), "".join(vmrk))
  256.  
  257. # ------------------------
  258. # Cross-checks (Gunkelman checks)
  259. # ------------------------
  260. def cross_checks(edf: EDFHeader, vhdr: dict|None, vmrk: dict|None, eeg_path: Path|None, force_fs=None, force_labels=None):
  261.     report = {
  262.         "timestamp": now_iso(),
  263.         "edf_path": str(edf.path) if edf else None,
  264.         "vhdr_path": vhdr["path"] if vhdr else None,
  265.         "vmrk_path": vmrk["path"] if vmrk else None,
  266.         "eeg_path": str(eeg_path) if eeg_path else None,
  267.         "checks": [],
  268.         "summary": {},
  269.         "advice": []
  270.     }
  271.  
  272.     # EDF basics
  273.     edf_ok = edf is not None and edf.valid
  274.     fs_edf, fs_equal = (None, False)
  275.     total_samples_ch = None
  276.     labels_edf = None
  277.     if edf_ok:
  278.         fs_edf, fs_equal = edf.common_fs()
  279.         total_samples_ch = edf.total_samples_per_channel()
  280.         labels_edf = edf.labels
  281.         report["summary"]["edf_fs"] = fs_edf
  282.         report["summary"]["edf_fs_channels_equal"] = fs_equal
  283.         report["summary"]["edf_n_signals"] = edf.n_signals
  284.         report["summary"]["edf_duration_s"] = edf.n_records * edf.duration_record_s
  285.         report["summary"]["edf_total_samples_per_ch"] = total_samples_ch
  286.  
  287.     # VHDR basics
  288.     if vhdr:
  289.         try:
  290.             fs_vhdr = 1_000_000.0 / float(vhdr.get("SamplingInterval_us", 0) or 0)
  291.         except ZeroDivisionError:
  292.             fs_vhdr = 0.0
  293.         n_vhdr = vhdr.get("NumberOfChannels")
  294.         dataformat = vhdr.get("DataFormat","").upper()
  295.         dataorient = vhdr.get("DataOrientation","").upper()
  296.         binfmt = vhdr.get("BinaryFormat","").upper()
  297.         labels_vhdr = [c["label"] for c in vhdr.get("channels", [])]
  298.  
  299.         report["summary"]["vhdr_fs"] = fs_vhdr
  300.         report["summary"]["vhdr_n_channels"] = n_vhdr
  301.         report["summary"]["vhdr_binfmt"] = binfmt
  302.         report["summary"]["vhdr_dataformat"] = dataformat
  303.         report["summary"]["vhdr_dataorientation"] = dataorient
  304.         report["summary"]["vhdr_labels"] = labels_vhdr
  305.  
  306.         report["checks"].append({
  307.             "name":"FS match (EDF vs VHDR)",
  308.             "expected": fs_edf,
  309.             "observed": fs_vhdr,
  310.             "pass": (edf_ok and abs(fs_vhdr - fs_edf) < 1e-6)
  311.         })
  312.         report["checks"].append({
  313.             "name":"N channels match (EDF vs VHDR)",
  314.             "expected": edf.n_signals if edf_ok else None,
  315.             "observed": n_vhdr,
  316.             "pass": (edf_ok and (n_vhdr == edf.n_signals))
  317.         })
  318.         report["checks"].append({
  319.             "name":"Orientation MULTIPLEXED",
  320.             "expected":"MULTIPLEXED",
  321.             "observed": dataorient,
  322.             "pass": (dataorient == "MULTIPLEXED")
  323.         })
  324.         report["checks"].append({
  325.             "name":"DataFormat BINARY",
  326.             "expected":"BINARY",
  327.             "observed": dataformat,
  328.             "pass": (dataformat == "BINARY")
  329.         })
  330.  
  331.     # EEG binary size check (if we have both counts and the .eeg)
  332.     if eeg_path and eeg_path.exists() and edf_ok:
  333.         file_bytes = eeg_path.stat().st_size
  334.         # Assume INT_16 unless vhdr says otherwise
  335.         binfmt = (vhdr.get("BinaryFormat","INT_16") if vhdr else "INT_16").upper()
  336.         bps = 2 if binfmt == "INT_16" else 4 if "32" in binfmt else 2
  337.         # if EDF has per-channel samples equal, take first
  338.         total_samps = total_samples_ch[0] if total_samples_ch else 0
  339.         expected = expected_eeg_bytes(edf.n_signals, total_samps, binfmt)
  340.         report["checks"].append({
  341.             "name":"EEG binary size matches expected",
  342.             "expected_bytes": expected,
  343.             "observed_bytes": file_bytes,
  344.             "pass": (expected == file_bytes)
  345.         })
  346.         if expected != file_bytes:
  347.             report["advice"].append(
  348.                 f"Binary size mismatch: expected {expected} bytes from EDF, got {file_bytes}. "
  349.                 f"Check BinaryFormat ({binfmt}), channel count, or total samples."
  350.             )
  351.  
  352.     # Labels check (order)
  353.     if vhdr and edf_ok:
  354.         labels_vhdr = [c["label"] for c in vhdr.get("channels", [])]
  355.         same_len = len(labels_vhdr) == len(labels_edf)
  356.         same_set = set([l.upper() for l in labels_vhdr]) == set([l.upper() for l in labels_edf])
  357.         report["checks"].append({
  358.             "name":"Channel labels set match",
  359.             "expected_set": labels_edf,
  360.             "observed_set": labels_vhdr,
  361.             "pass": same_set
  362.         })
  363.         if same_set and not same_len:
  364.             report["advice"].append("Channel label sets match but lengths differ. Investigate duplicates or extra channels.")
  365.         # Order differences matter clinically
  366.         if same_set and labels_vhdr != labels_edf:
  367.             report["advice"].append("Channel labels are same set but different order; ensure multiplex order matches BrainVision expectations.")
  368.  
  369.     # Summary confidence (simple)
  370.     passed = sum(1 for c in report["checks"] if c["pass"])
  371.     total  = len(report["checks"])
  372.     confidence = (passed/total) if total else 0.0
  373.     report["summary"]["confidence"] = round(confidence, 3)
  374.     return report
  375.  
  376. # ------------------------
  377. # Optional signal-level QC (uses MNE if present)
  378. # ------------------------
  379. def signal_qc_with_mne(edf_path: Path, fs_expect=None):
  380.     try:
  381.         import mne, numpy as np
  382.         raw = mne.io.read_raw_edf(str(edf_path), preload=True, verbose=False)
  383.         fs = raw.info["sfreq"]
  384.         ch_names = raw.ch_names
  385.         out = {"ok": True, "fs": float(fs), "alpha_peak_hz": None, "mains_hz": None, "blink_polarity_ok": None, "notes": []}
  386.  
  387.         # Alpha at O1/O2 if present
  388.         picks = []
  389.         for name in ["O1","O2","Oz","POz"]:
  390.             if name in ch_names:
  391.                 picks.append(name)
  392.         if picks:
  393.             psds, freqs = mne.time_frequency.psd_welch(raw.copy().pick(picks), fmin=2, fmax=40, n_fft=4096, verbose=False)
  394.             mean_psd = psds.mean(axis=0)
  395.             idx = (freqs>=7) & (freqs<=13)
  396.             if np.any(idx):
  397.                 alpha_f = float(freqs[idx][np.argmax(mean_psd[idx])])
  398.                 out["alpha_peak_hz"] = alpha_f
  399.  
  400.         # mains 50/60
  401.         psd_all, f_all = mne.time_frequency.psd_welch(raw, fmin=40, fmax=70, n_fft=4096, verbose=False)
  402.         peaks = f_all[psd_all.mean(axis=0).argmax()]
  403.         if 49 <= peaks <= 51: out["mains_hz"] = 50.0
  404.         elif 59 <= peaks <= 61: out["mains_hz"] = 60.0
  405.         else: out["mains_hz"] = float(peaks)
  406.  
  407.         # Blink polarity: Fp1/Fp2 if present; expect positive deflection (minus-up displays)
  408.         for cand in [("Fp1","Fp2"), ("AFz","Fpz")]:
  409.             if all(c in ch_names for c in cand):
  410.                 data, _ = raw.copy().pick(cand).filter(0.1,3.0, verbose=False).get_data()
  411.                 # crude blink detector: big peaks
  412.                 if data.shape[1] > 0:
  413.                     import numpy as np
  414.                     peak = np.percentile(data[0], 99)
  415.                     trough = np.percentile(data[0], 1)
  416.                     out["blink_polarity_ok"] = bool(peak > abs(trough))
  417.                 break
  418.  
  419.         if fs_expect and abs(fs - fs_expect) > 1e-6:
  420.             out["notes"].append(f"Fs mismatch: EDF {fs} vs expected {fs_expect}")
  421.  
  422.         return out
  423.     except Exception as e:
  424.         return {"ok": False, "error": str(e)}
  425.  
  426. # ------------------------
  427. # Main
  428. # ------------------------
  429. def main():
  430.     ap = argparse.ArgumentParser(description="EDF → BrainVision converter with Gunkelman cross-checks")
  431.     ap.add_argument("--edf", type=Path, required=True, help="Input EDF file")
  432.     ap.add_argument("--vhdr", type=Path, help="Optional existing .vhdr to validate")
  433.     ap.add_argument("--vmrk", type=Path, help="Optional existing .vmrk to validate")
  434.     ap.add_argument("--eeg",  type=Path, help="Optional existing .eeg binary for size cross-check")
  435.     ap.add_argument("--outbase", type=Path, help="Base path for synthesized VHDR/VMRK (e.g., /path/to/subject01)")
  436.     ap.add_argument("--resolution", type=float, default=0.195, help="µV/bit Resolution to write in VHDR")
  437.     ap.add_argument("--binfmt", type=str, default="INT_16", choices=["INT_16","FLOAT_32"], help="BinaryFormat in VHDR")
  438.     ap.add_argument("--write", action="store_true", help="Write .vhdr/.vmrk from EDF")
  439.     args = ap.parse_args()
  440.  
  441.     edf = EDFHeader(args.edf).read()
  442.     vhdr = parse_vhdr(args.vhdr) if args.vhdr else None
  443.     vmrk = parse_vmrk(args.vmrk) if args.vmrk else None
  444.  
  445.     report = cross_checks(edf, vhdr, vmrk, args.eeg)
  446.     # Optional signal QC (only if MNE available)
  447.     sig_qc = signal_qc_with_mne(args.edf, fs_expect=report["summary"].get("edf_fs"))
  448.     if sig_qc.get("ok"):
  449.         report["signal_qc"] = sig_qc
  450.  
  451.     outdir = (args.outbase.parent if args.outbase else Path.cwd())
  452.     outstem = (args.outbase.name if args.outbase else args.edf.stem)
  453.     report_txt = outdir / f"{outstem}__gunkelman_report.txt"
  454.     report_json = outdir / f"{outstem}__gunkelman_report.json"
  455.  
  456.     # Human-readable report
  457.     lines = []
  458.     lines.append(f"# Gunkelman Cross-Check Report\nGenerated: {report['timestamp']}\n")
  459.     lines.append(f"EDF:  {report['edf_path']}")
  460.     if report.get("vhdr_path"): lines.append(f"VHDR: {report['vhdr_path']}")
  461.     if report.get("vmrk_path"): lines.append(f"VMRK: {report['vmrk_path']}")
  462.     if report.get("eeg_path"):  lines.append(f"EEG:  {report['eeg_path']}")
  463.     lines.append("")
  464.  
  465.     # Summary
  466.     s = report["summary"]
  467.     lines.append("## Summary")
  468.     for k,v in s.items():
  469.         lines.append(f"- {k}: {v}")
  470.     lines.append("")
  471.     # Checks
  472.     lines.append("## Checks")
  473.     for c in report["checks"]:
  474.         lines.append(f"- {c['name']}: {pretty_bool(c['pass'])}  (expected: {c.get('expected') or c.get('expected_bytes')}, observed: {c.get('observed') or c.get('observed_bytes')})")
  475.     # Advice
  476.     if report["advice"]:
  477.         lines.append("\n## Advice")
  478.         for a in report["advice"]:
  479.             lines.append(f"- {a}")
  480.  
  481.     # Signal QC
  482.     if sig_qc.get("ok"):
  483.         lines.append("\n## Signal QC (MNE)")
  484.         lines.append(f"- fs detected: {sig_qc.get('fs')}")
  485.         if sig_qc.get("alpha_peak_hz") is not None:
  486.             lines.append(f"- alpha peak (O1/O2): {round(sig_qc['alpha_peak_hz'],2)} Hz")
  487.         if sig_qc.get("mains_hz") is not None:
  488.             lines.append(f"- mains peak: {sig_qc['mains_hz']} Hz")
  489.         if sig_qc.get("blink_polarity_ok") is not None:
  490.             lines.append(f"- blink polarity ok: {sig_qc['blink_polarity_ok']}")
  491.         if sig_qc.get("notes"):
  492.             for n in sig_qc["notes"]:
  493.                 lines.append(f"- note: {n}")
  494.     else:
  495.         lines.append("\n## Signal QC")
  496.         lines.append(f"- skipped or failed: {sig_qc.get('error','no MNE installed')}")
  497.  
  498.     write_text(report_txt, "\n".join(lines))
  499.     write_text(report_json, json.dumps(report, indent=2))
  500.  
  501.     print(f"[OK] Wrote report:\n  {report_txt}\n  {report_json}")
  502.  
  503.     # Write new VHDR/VMRK from EDF (if requested)
  504.     if args.write:
  505.         labels = edf.labels
  506.         fs, ok = edf.common_fs()
  507.         if not ok:
  508.             print("[WARN] EDF channels have differing fs; using mode.")
  509.         total_samples = edf.total_samples_per_channel()[0]
  510.         base = args.outbase if args.outbase else (outdir / outstem)
  511.         data_filename = f"{base.name}.eeg"
  512.         marker_filename = f"{base.name}.vmrk"
  513.  
  514.         synthesize_vhdr(base, data_filename, marker_filename, labels, fs, resolution_uV=args.resolution, binary_format=args.binfmt)
  515.         synthesize_vmrk(base, data_filename, total_samples, extra_markers=None)
  516.         print(f"[OK] Wrote VHDR/VMRK next to report using fs={fs}, N={len(labels)}, Resolution={args.resolution} µV/bit, BinaryFormat={args.binfmt}")
  517.         print(f"     {base.with_suffix('.vhdr')}")
  518.         print(f"     {base.with_suffix('.vmrk')}")
  519.         print("NOTE: This writes headers only. Ensure your .eeg binary matches EDF sample count & BinaryFormat (INT_16 recommended).")
  520.  
  521. if __name__ == "__main__":
  522.     main()
Advertisement
Add Comment
Please, Sign In to add comment