Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # EDF2EEG_converter.py
- # EDF → BrainVision (.vhdr/.vmrk) + cross-check report (header-level always; signal-level if mne available)
- import argparse, os, struct, json, datetime, re
- from pathlib import Path
- from configparser import ConfigParser
- # ------------------------
- # Utilities
- # ------------------------
- def micround(fs: float) -> int:
- """SamplingInterval in microseconds (rounded int)."""
- return int(round(1_000_000.0 / float(fs)))
- def now_iso():
- return datetime.datetime.now().isoformat(timespec="seconds")
- def write_text(p: Path, s: str):
- p.write_text(s, encoding="utf-8")
- def pretty_bool(b: bool) -> str:
- return "PASS" if b else "FAIL"
- # ------------------------
- # Minimal EDF header reader (no 3rd party deps)
- # ------------------------
- class EDFHeader:
- def __init__(self, path: Path):
- self.path = path
- self.n_signals = None
- self.n_records = None
- self.duration_record_s = None
- self.labels = []
- self.transducer = []
- self.phys_dim = []
- self.phys_min = []
- self.phys_max = []
- self.dig_min = []
- self.dig_max = []
- self.prefilt = []
- self.samples_per_record = []
- self.header_bytes = None
- self.valid = False
- def read(self):
- with self.path.open("rb") as f:
- head0 = f.read(256)
- if len(head0) < 256:
- raise ValueError("EDF header too short.")
- # Bytes 252-256: Number of signals (ns)
- self.n_signals = int(head0[252:256].decode("ascii", "ignore").strip() or "0")
- if self.n_signals <= 0 or self.n_signals > 512:
- raise ValueError(f"EDF ns invalid: {self.n_signals}")
- # Next sections are 256 bytes * ns, each field concatenated across channels
- def read_field(field_len):
- return f.read(field_len * self.n_signals)
- labels_raw = read_field(16)
- transducer_raw = read_field(80)
- phys_dim_raw = read_field(8)
- phys_min_raw = read_field(8)
- phys_max_raw = read_field(8)
- dig_min_raw = read_field(8)
- dig_max_raw = read_field(8)
- prefilt_raw = read_field(80)
- spr_raw = read_field(8)
- # Back to head0 to get number of records and duration
- n_records = int(head0[236:244].decode("ascii","ignore").strip() or "-1")
- dur_per_record = float(head0[244:252].decode("ascii","ignore").strip() or "0")
- self.n_records = n_records
- self.duration_record_s = dur_per_record
- # Helper to split per-signal strings
- def split_arr(raw, step, cast=str, strip=True):
- arr = []
- for i in range(self.n_signals):
- chunk = raw[i*step:(i+1)*step].decode("ascii", "ignore")
- if strip: chunk = chunk.strip()
- if cast is float:
- try: arr.append(float(chunk))
- except: arr.append(float("nan"))
- elif cast is int:
- try: arr.append(int(chunk))
- except: arr.append(0)
- else:
- arr.append(chunk)
- return arr
- self.labels = split_arr(labels_raw, 16, str)
- self.transducer = split_arr(transducer_raw, 80, str)
- self.phys_dim = split_arr(phys_dim_raw, 8, str)
- self.phys_min = split_arr(phys_min_raw, 8, float)
- self.phys_max = split_arr(phys_max_raw, 8, float)
- self.dig_min = split_arr(dig_min_raw, 8, int)
- self.dig_max = split_arr(dig_max_raw, 8, int)
- self.prefilt = split_arr(prefilt_raw, 80, str)
- self.samples_per_record = split_arr(spr_raw, 8, int)
- # Compute fs per channel
- self.fs_per_channel = []
- for spr in self.samples_per_record:
- fs = spr / self.duration_record_s if self.duration_record_s > 0 else 0.0
- self.fs_per_channel.append(fs)
- self.header_bytes = 256 + self.n_signals * 256
- self.valid = True
- return self
- def common_fs(self):
- # returns (fs, ok_equal) where ok_equal=True if all channels agree
- if not self.valid: return (None, False)
- uniq = {round(fs,6) for fs in self.fs_per_channel}
- if len(uniq) == 1:
- return (float(next(iter(uniq))), True)
- # if multiple, take mode
- from collections import Counter
- c = Counter(self.fs_per_channel)
- fs = max(c.items(), key=lambda kv: kv[1])[0]
- return (float(fs), False)
- def total_samples_per_channel(self):
- # total = n_records * samples_per_record[ch]
- if not self.valid: return None
- return [self.n_records * spr for spr in self.samples_per_record]
- # ------------------------
- # BrainVision parsers
- # ------------------------
- def parse_vhdr(path: Path):
- cfg = ConfigParser()
- # preserve case & allow no-value commas
- cfg.optionxform = str
- text = path.read_text(encoding="utf-8", errors="ignore")
- # Inject section headers if malformed? (not needed usually)
- cfg.read_string(text)
- out = {"ok": True, "errors": [], "path": str(path)}
- try:
- ci = cfg["Common Infos"]
- bi = cfg["Binary Infos"]
- ch = cfg["Channel Infos"]
- out["DataFile"] = ci.get("DataFile", "").strip()
- out["MarkerFile"] = ci.get("MarkerFile","").strip()
- out["DataFormat"] = ci.get("DataFormat","").strip()
- out["DataOrientation"] = ci.get("DataOrientation","").strip()
- out["NumberOfChannels"] = int(ci.get("NumberOfChannels","0").strip() or "0")
- out["SamplingInterval_us"] = float(ci.get("SamplingInterval","0").strip() or "0")
- out["UseBigEndianOrder"] = ci.get("UseBigEndianOrder","NO").strip().upper() == "YES"
- out["BinaryFormat"] = bi.get("BinaryFormat","").strip()
- # Channel lines: Ch1=Fp1,,0.195,uV
- chans = []
- for k,v in cfg.items("Channel Infos"):
- if not k.lower().startswith("ch"): continue
- parts = [p.strip() for p in v.split(",")]
- label = parts[0] if len(parts)>0 else ""
- ref = parts[1] if len(parts)>1 else ""
- res = parts[2] if len(parts)>2 else ""
- unit = parts[3] if len(parts)>3 else ""
- try:
- res_val = float(res)
- except:
- res_val = None
- chans.append({"label": label, "ref": ref, "resolution_uV_per_bit": res_val, "unit": unit})
- out["channels"] = chans
- except Exception as e:
- out["ok"] = False
- out["errors"].append(f"VHDR parse error: {e}")
- return out
- def parse_vmrk(path: Path):
- out = {"ok": True, "errors": [], "path": str(path), "markers": [], "DataFile": None}
- try:
- lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
- in_marker = False
- for ln in lines:
- if ln.strip().startswith("[Common Infos]"):
- in_marker = False
- if ln.strip().startswith("[Marker Infos]"):
- in_marker = True
- continue
- if not in_marker:
- if ln.startswith("DataFile="):
- out["DataFile"] = ln.split("=",1)[1].strip()
- continue
- if ln.startswith("Mk"):
- # Mk1=Type,Desc,Latency,Channel,Duration
- _, rhs = ln.split("=",1)
- parts = [p.strip() for p in rhs.split(",")]
- if len(parts) >= 5:
- out["markers"].append({
- "type": parts[0], "desc": parts[1],
- "latency_samples": int(re.sub(r"[^\d]", "", parts[2]) or "0"),
- "channel": int(parts[3]),
- "duration_samples": int(re.sub(r"[^\d]", "", parts[4]) or "0")
- })
- except Exception as e:
- out["ok"] = False
- out["errors"].append(f"VMRK parse error: {e}")
- return out
- # ------------------------
- # Raw .eeg binary sanity
- # ------------------------
- def expected_eeg_bytes(n_ch: int, total_samples_per_ch: int, binary_format: str) -> int:
- bps = 2 if binary_format.upper() == "INT_16" else 4 if "32" in binary_format else None
- if bps is None:
- return -1
- return n_ch * total_samples_per_ch * bps
- # ------------------------
- # Writer: BrainVision files
- # ------------------------
- def synthesize_vhdr(base: Path, data_filename: str, marker_filename: str,
- labels, fs: float, resolution_uV=0.195, binary_format="INT_16"):
- N = len(labels)
- vhdr = []
- vhdr.append("Brain Vision Data Exchange Header File Version 1.0\n")
- vhdr.append("[Common Infos]\n")
- vhdr.append("Codepage=ANSI\n")
- vhdr.append(f"DataFile={data_filename}\n")
- vhdr.append(f"MarkerFile={marker_filename}\n")
- vhdr.append("DataFormat=BINARY\n")
- vhdr.append("DataOrientation=MULTIPLEXED\n")
- vhdr.append(f"NumberOfChannels={N}\n")
- vhdr.append(f"SamplingInterval={micround(fs)}\n")
- vhdr.append("UseBigEndianOrder=NO\n\n")
- vhdr.append("[Binary Infos]\n")
- vhdr.append(f"BinaryFormat={binary_format}\n\n")
- vhdr.append("[Channel Infos]\n")
- for i, lab in enumerate(labels, start=1):
- vhdr.append(f"Ch{i}={lab},,{resolution_uV},uV\n")
- write_text(base.with_suffix(".vhdr"), "".join(vhdr))
- def synthesize_vmrk(base: Path, data_filename: str, total_samples: int, extra_markers=None):
- vmrk = []
- vmrk.append("Brain Vision Data Exchange Marker File, Version 1.0\n\n")
- vmrk.append("[Common Infos]\n")
- vmrk.append("Codepage=ANSI\n")
- vmrk.append(f"DataFile={data_filename}\n\n")
- vmrk.append("[Marker Infos]\n")
- vmrk.append("Mk1=New Segment,,1,0,0\n")
- vmrk.append("Mk2=Recording Start,,1,0,0\n")
- idx = 3
- if extra_markers:
- for m in extra_markers:
- vmrk.append(f"Mk{idx}={m['type']},{m['desc']},{m['latency_samples']},{m.get('channel',0)},{m.get('duration_samples',0)}\n")
- idx += 1
- vmrk.append(f"Mk999=Recording End,,{total_samples},0,0\n")
- write_text(base.with_suffix(".vmrk"), "".join(vmrk))
- # ------------------------
- # Cross-checks (Gunkelman checks)
- # ------------------------
- def cross_checks(edf: EDFHeader, vhdr: dict|None, vmrk: dict|None, eeg_path: Path|None, force_fs=None, force_labels=None):
- report = {
- "timestamp": now_iso(),
- "edf_path": str(edf.path) if edf else None,
- "vhdr_path": vhdr["path"] if vhdr else None,
- "vmrk_path": vmrk["path"] if vmrk else None,
- "eeg_path": str(eeg_path) if eeg_path else None,
- "checks": [],
- "summary": {},
- "advice": []
- }
- # EDF basics
- edf_ok = edf is not None and edf.valid
- fs_edf, fs_equal = (None, False)
- total_samples_ch = None
- labels_edf = None
- if edf_ok:
- fs_edf, fs_equal = edf.common_fs()
- total_samples_ch = edf.total_samples_per_channel()
- labels_edf = edf.labels
- report["summary"]["edf_fs"] = fs_edf
- report["summary"]["edf_fs_channels_equal"] = fs_equal
- report["summary"]["edf_n_signals"] = edf.n_signals
- report["summary"]["edf_duration_s"] = edf.n_records * edf.duration_record_s
- report["summary"]["edf_total_samples_per_ch"] = total_samples_ch
- # VHDR basics
- if vhdr:
- try:
- fs_vhdr = 1_000_000.0 / float(vhdr.get("SamplingInterval_us", 0) or 0)
- except ZeroDivisionError:
- fs_vhdr = 0.0
- n_vhdr = vhdr.get("NumberOfChannels")
- dataformat = vhdr.get("DataFormat","").upper()
- dataorient = vhdr.get("DataOrientation","").upper()
- binfmt = vhdr.get("BinaryFormat","").upper()
- labels_vhdr = [c["label"] for c in vhdr.get("channels", [])]
- report["summary"]["vhdr_fs"] = fs_vhdr
- report["summary"]["vhdr_n_channels"] = n_vhdr
- report["summary"]["vhdr_binfmt"] = binfmt
- report["summary"]["vhdr_dataformat"] = dataformat
- report["summary"]["vhdr_dataorientation"] = dataorient
- report["summary"]["vhdr_labels"] = labels_vhdr
- report["checks"].append({
- "name":"FS match (EDF vs VHDR)",
- "expected": fs_edf,
- "observed": fs_vhdr,
- "pass": (edf_ok and abs(fs_vhdr - fs_edf) < 1e-6)
- })
- report["checks"].append({
- "name":"N channels match (EDF vs VHDR)",
- "expected": edf.n_signals if edf_ok else None,
- "observed": n_vhdr,
- "pass": (edf_ok and (n_vhdr == edf.n_signals))
- })
- report["checks"].append({
- "name":"Orientation MULTIPLEXED",
- "expected":"MULTIPLEXED",
- "observed": dataorient,
- "pass": (dataorient == "MULTIPLEXED")
- })
- report["checks"].append({
- "name":"DataFormat BINARY",
- "expected":"BINARY",
- "observed": dataformat,
- "pass": (dataformat == "BINARY")
- })
- # EEG binary size check (if we have both counts and the .eeg)
- if eeg_path and eeg_path.exists() and edf_ok:
- file_bytes = eeg_path.stat().st_size
- # Assume INT_16 unless vhdr says otherwise
- binfmt = (vhdr.get("BinaryFormat","INT_16") if vhdr else "INT_16").upper()
- bps = 2 if binfmt == "INT_16" else 4 if "32" in binfmt else 2
- # if EDF has per-channel samples equal, take first
- total_samps = total_samples_ch[0] if total_samples_ch else 0
- expected = expected_eeg_bytes(edf.n_signals, total_samps, binfmt)
- report["checks"].append({
- "name":"EEG binary size matches expected",
- "expected_bytes": expected,
- "observed_bytes": file_bytes,
- "pass": (expected == file_bytes)
- })
- if expected != file_bytes:
- report["advice"].append(
- f"Binary size mismatch: expected {expected} bytes from EDF, got {file_bytes}. "
- f"Check BinaryFormat ({binfmt}), channel count, or total samples."
- )
- # Labels check (order)
- if vhdr and edf_ok:
- labels_vhdr = [c["label"] for c in vhdr.get("channels", [])]
- same_len = len(labels_vhdr) == len(labels_edf)
- same_set = set([l.upper() for l in labels_vhdr]) == set([l.upper() for l in labels_edf])
- report["checks"].append({
- "name":"Channel labels set match",
- "expected_set": labels_edf,
- "observed_set": labels_vhdr,
- "pass": same_set
- })
- if same_set and not same_len:
- report["advice"].append("Channel label sets match but lengths differ. Investigate duplicates or extra channels.")
- # Order differences matter clinically
- if same_set and labels_vhdr != labels_edf:
- report["advice"].append("Channel labels are same set but different order; ensure multiplex order matches BrainVision expectations.")
- # Summary confidence (simple)
- passed = sum(1 for c in report["checks"] if c["pass"])
- total = len(report["checks"])
- confidence = (passed/total) if total else 0.0
- report["summary"]["confidence"] = round(confidence, 3)
- return report
- # ------------------------
- # Optional signal-level QC (uses MNE if present)
- # ------------------------
- def signal_qc_with_mne(edf_path: Path, fs_expect=None):
- try:
- import mne, numpy as np
- raw = mne.io.read_raw_edf(str(edf_path), preload=True, verbose=False)
- fs = raw.info["sfreq"]
- ch_names = raw.ch_names
- out = {"ok": True, "fs": float(fs), "alpha_peak_hz": None, "mains_hz": None, "blink_polarity_ok": None, "notes": []}
- # Alpha at O1/O2 if present
- picks = []
- for name in ["O1","O2","Oz","POz"]:
- if name in ch_names:
- picks.append(name)
- if picks:
- psds, freqs = mne.time_frequency.psd_welch(raw.copy().pick(picks), fmin=2, fmax=40, n_fft=4096, verbose=False)
- mean_psd = psds.mean(axis=0)
- idx = (freqs>=7) & (freqs<=13)
- if np.any(idx):
- alpha_f = float(freqs[idx][np.argmax(mean_psd[idx])])
- out["alpha_peak_hz"] = alpha_f
- # mains 50/60
- psd_all, f_all = mne.time_frequency.psd_welch(raw, fmin=40, fmax=70, n_fft=4096, verbose=False)
- peaks = f_all[psd_all.mean(axis=0).argmax()]
- if 49 <= peaks <= 51: out["mains_hz"] = 50.0
- elif 59 <= peaks <= 61: out["mains_hz"] = 60.0
- else: out["mains_hz"] = float(peaks)
- # Blink polarity: Fp1/Fp2 if present; expect positive deflection (minus-up displays)
- for cand in [("Fp1","Fp2"), ("AFz","Fpz")]:
- if all(c in ch_names for c in cand):
- data, _ = raw.copy().pick(cand).filter(0.1,3.0, verbose=False).get_data()
- # crude blink detector: big peaks
- if data.shape[1] > 0:
- import numpy as np
- peak = np.percentile(data[0], 99)
- trough = np.percentile(data[0], 1)
- out["blink_polarity_ok"] = bool(peak > abs(trough))
- break
- if fs_expect and abs(fs - fs_expect) > 1e-6:
- out["notes"].append(f"Fs mismatch: EDF {fs} vs expected {fs_expect}")
- return out
- except Exception as e:
- return {"ok": False, "error": str(e)}
- # ------------------------
- # Main
- # ------------------------
- def main():
- ap = argparse.ArgumentParser(description="EDF → BrainVision converter with Gunkelman cross-checks")
- ap.add_argument("--edf", type=Path, required=True, help="Input EDF file")
- ap.add_argument("--vhdr", type=Path, help="Optional existing .vhdr to validate")
- ap.add_argument("--vmrk", type=Path, help="Optional existing .vmrk to validate")
- ap.add_argument("--eeg", type=Path, help="Optional existing .eeg binary for size cross-check")
- ap.add_argument("--outbase", type=Path, help="Base path for synthesized VHDR/VMRK (e.g., /path/to/subject01)")
- ap.add_argument("--resolution", type=float, default=0.195, help="µV/bit Resolution to write in VHDR")
- ap.add_argument("--binfmt", type=str, default="INT_16", choices=["INT_16","FLOAT_32"], help="BinaryFormat in VHDR")
- ap.add_argument("--write", action="store_true", help="Write .vhdr/.vmrk from EDF")
- args = ap.parse_args()
- edf = EDFHeader(args.edf).read()
- vhdr = parse_vhdr(args.vhdr) if args.vhdr else None
- vmrk = parse_vmrk(args.vmrk) if args.vmrk else None
- report = cross_checks(edf, vhdr, vmrk, args.eeg)
- # Optional signal QC (only if MNE available)
- sig_qc = signal_qc_with_mne(args.edf, fs_expect=report["summary"].get("edf_fs"))
- if sig_qc.get("ok"):
- report["signal_qc"] = sig_qc
- outdir = (args.outbase.parent if args.outbase else Path.cwd())
- outstem = (args.outbase.name if args.outbase else args.edf.stem)
- report_txt = outdir / f"{outstem}__gunkelman_report.txt"
- report_json = outdir / f"{outstem}__gunkelman_report.json"
- # Human-readable report
- lines = []
- lines.append(f"# Gunkelman Cross-Check Report\nGenerated: {report['timestamp']}\n")
- lines.append(f"EDF: {report['edf_path']}")
- if report.get("vhdr_path"): lines.append(f"VHDR: {report['vhdr_path']}")
- if report.get("vmrk_path"): lines.append(f"VMRK: {report['vmrk_path']}")
- if report.get("eeg_path"): lines.append(f"EEG: {report['eeg_path']}")
- lines.append("")
- # Summary
- s = report["summary"]
- lines.append("## Summary")
- for k,v in s.items():
- lines.append(f"- {k}: {v}")
- lines.append("")
- # Checks
- lines.append("## Checks")
- for c in report["checks"]:
- lines.append(f"- {c['name']}: {pretty_bool(c['pass'])} (expected: {c.get('expected') or c.get('expected_bytes')}, observed: {c.get('observed') or c.get('observed_bytes')})")
- # Advice
- if report["advice"]:
- lines.append("\n## Advice")
- for a in report["advice"]:
- lines.append(f"- {a}")
- # Signal QC
- if sig_qc.get("ok"):
- lines.append("\n## Signal QC (MNE)")
- lines.append(f"- fs detected: {sig_qc.get('fs')}")
- if sig_qc.get("alpha_peak_hz") is not None:
- lines.append(f"- alpha peak (O1/O2): {round(sig_qc['alpha_peak_hz'],2)} Hz")
- if sig_qc.get("mains_hz") is not None:
- lines.append(f"- mains peak: {sig_qc['mains_hz']} Hz")
- if sig_qc.get("blink_polarity_ok") is not None:
- lines.append(f"- blink polarity ok: {sig_qc['blink_polarity_ok']}")
- if sig_qc.get("notes"):
- for n in sig_qc["notes"]:
- lines.append(f"- note: {n}")
- else:
- lines.append("\n## Signal QC")
- lines.append(f"- skipped or failed: {sig_qc.get('error','no MNE installed')}")
- write_text(report_txt, "\n".join(lines))
- write_text(report_json, json.dumps(report, indent=2))
- print(f"[OK] Wrote report:\n {report_txt}\n {report_json}")
- # Write new VHDR/VMRK from EDF (if requested)
- if args.write:
- labels = edf.labels
- fs, ok = edf.common_fs()
- if not ok:
- print("[WARN] EDF channels have differing fs; using mode.")
- total_samples = edf.total_samples_per_channel()[0]
- base = args.outbase if args.outbase else (outdir / outstem)
- data_filename = f"{base.name}.eeg"
- marker_filename = f"{base.name}.vmrk"
- synthesize_vhdr(base, data_filename, marker_filename, labels, fs, resolution_uV=args.resolution, binary_format=args.binfmt)
- synthesize_vmrk(base, data_filename, total_samples, extra_markers=None)
- print(f"[OK] Wrote VHDR/VMRK next to report using fs={fs}, N={len(labels)}, Resolution={args.resolution} µV/bit, BinaryFormat={args.binfmt}")
- print(f" {base.with_suffix('.vhdr')}")
- print(f" {base.with_suffix('.vmrk')}")
- print("NOTE: This writes headers only. Ensure your .eeg binary matches EDF sample count & BinaryFormat (INT_16 recommended).")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment