Created
March 17, 2026 19:20
-
-
Save h-mayorquin/d6ba91365dfaecbf1aab1dd85838c586 to your computer and use it in GitHub Desktop.
stub_open_ephys_binary
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # requires-python = ">=3.10" | |
| # dependencies = [ | |
| # "numpy", | |
| # ] | |
| # /// | |
| """ | |
| Generate a stubbed version of the OneBox NP2014 dataset for testing. | |
| Takes the full recording and produces a 1-second stub with anonymized | |
| serial numbers. The stub preserves the full directory structure, all | |
| metadata files (settings.xml, structure.oebin, sync_messages.txt), | |
| and real neural data for the first 30,000 samples. | |
| Usage: | |
| uv run generate_stub.py 2025-11-17_10-48-37 stub/2025-11-17_10-48-37 | |
| The source data is the recording shared by SplintZ in SpikeInterface issue #4394. | |
| """ | |
| import json | |
| import shutil | |
| import sys | |
| import xml.etree.ElementTree as ET | |
| from pathlib import Path | |
| import numpy as np | |
| SAMPLES = 30_000 | |
| SERIAL_NUMBER_ATTRS = [ | |
| "probe_serial_number", | |
| "custom_probe_name", | |
| "headstage_serial_number", | |
| "bsc_serial_number", | |
| "bs_serial_number", | |
| ] | |
| # Parse arguments | |
| if len(sys.argv) != 3: | |
| print(f"Usage: {sys.argv[0]} <source_session_folder> <destination_session_folder>") | |
| sys.exit(1) | |
| src_session = Path(sys.argv[1]) | |
| dst_session = Path(sys.argv[2]) | |
| dst_session.mkdir(parents=True, exist_ok=True) | |
| # Find all Record Node folders (they contain settings.xml) | |
| record_nodes = [p.parent for p in sorted(src_session.glob("*/settings.xml"))] | |
| if not record_nodes: | |
| print(f"No Record Node folders found in {src_session}") | |
| sys.exit(1) | |
| for src_node in record_nodes: | |
| node_name = src_node.name | |
| dst_node = dst_session / node_name | |
| dst_node.mkdir(parents=True, exist_ok=True) | |
| print(f"Stubbing {node_name}...") | |
| # Anonymize settings.xml (and settings_N.xml for extra experiments) | |
| for settings_file in src_node.glob("settings*.xml"): | |
| tree = ET.parse(str(settings_file)) | |
| root = tree.getroot() | |
| machine = root.find("INFO/MACHINE") | |
| if machine is not None: | |
| machine.set("name", "ANONYMOUS") | |
| machine.set("cpu_model", "Anonymous CPU") | |
| for np_probe in root.iter("NP_PROBE"): | |
| for attr in SERIAL_NUMBER_ATTRS: | |
| if attr in np_probe.attrib: | |
| np_probe.set(attr, "0") | |
| tree.write(str(dst_node / settings_file.name), encoding="unicode", xml_declaration=True) | |
| # Process each experiment/recording | |
| for oebin_file in sorted(src_node.rglob("structure.oebin")): | |
| src_rec = oebin_file.parent | |
| rel = src_rec.relative_to(src_node) | |
| dst_rec = dst_node / rel | |
| dst_rec.mkdir(parents=True, exist_ok=True) | |
| # Copy structure.oebin as-is (no sensitive data) | |
| shutil.copy2(oebin_file, dst_rec / "structure.oebin") | |
| # Copy sync_messages.txt as-is | |
| if (src_rec / "sync_messages.txt").exists(): | |
| shutil.copy2(src_rec / "sync_messages.txt", dst_rec / "sync_messages.txt") | |
| # Stub continuous streams (read oebin to discover streams and channel counts) | |
| with open(oebin_file) as f: | |
| oebin = json.load(f) | |
| for stream_info in oebin.get("continuous", []): | |
| folder_name = stream_info["folder_name"].rstrip("/") | |
| n_channels = stream_info["num_channels"] | |
| src_stream = src_rec / "continuous" / folder_name | |
| if not src_stream.exists(): | |
| raise FileNotFoundError( | |
| f"oebin lists stream '{folder_name}' but {src_stream} does not exist" | |
| ) | |
| dst_stream = dst_rec / "continuous" / folder_name | |
| dst_stream.mkdir(parents=True, exist_ok=True) | |
| # Binary data: first SAMPLES | |
| bytes_per_sample = n_channels * 2 # int16 | |
| with open(src_stream / "continuous.dat", "rb") as f: | |
| data = f.read(SAMPLES * bytes_per_sample) | |
| with open(dst_stream / "continuous.dat", "wb") as f: | |
| f.write(data) | |
| # Numpy files: first SAMPLES | |
| for fname in ["sample_numbers.npy", "timestamps.npy"]: | |
| arr = np.load(src_stream / fname)[:SAMPLES] | |
| np.save(dst_stream / fname, arr) | |
| # Stub events (empty arrays preserving dtype) | |
| if (src_rec / "events").exists(): | |
| for events_dir in (src_rec / "events").iterdir(): | |
| if not events_dir.is_dir(): | |
| continue | |
| for sub in events_dir.iterdir(): | |
| if sub.is_dir(): | |
| dst_sub = dst_rec / "events" / events_dir.name / sub.name | |
| dst_sub.mkdir(parents=True, exist_ok=True) | |
| for npy_file in sub.glob("*.npy"): | |
| orig = np.load(npy_file) | |
| np.save(dst_sub / npy_file.name, np.array([], dtype=orig.dtype)) | |
| elif sub.suffix == ".npy": | |
| dst_evt = dst_rec / "events" / events_dir.name | |
| dst_evt.mkdir(parents=True, exist_ok=True) | |
| orig = np.load(sub) | |
| np.save(dst_evt / sub.name, np.array([], dtype=orig.dtype)) | |
| print("Done.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment