Skip to content

Instantly share code, notes, and snippets.

@h-mayorquin
Created March 17, 2026 19:20
Show Gist options
  • Select an option

  • Save h-mayorquin/d6ba91365dfaecbf1aab1dd85838c586 to your computer and use it in GitHub Desktop.

Select an option

Save h-mayorquin/d6ba91365dfaecbf1aab1dd85838c586 to your computer and use it in GitHub Desktop.
stub_open_ephys_binary
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "numpy",
# ]
# ///
"""
Generate a stubbed version of the OneBox NP2014 dataset for testing.
Takes the full recording and produces a 1-second stub with anonymized
serial numbers. The stub preserves the full directory structure, all
metadata files (settings.xml, structure.oebin, sync_messages.txt),
and real neural data for the first 30,000 samples.
Usage:
uv run generate_stub.py 2025-11-17_10-48-37 stub/2025-11-17_10-48-37
The source data is the recording shared by SplintZ in SpikeInterface issue #4394.
"""
import json
import shutil
import sys
import xml.etree.ElementTree as ET
from pathlib import Path
import numpy as np
SAMPLES = 30_000
SERIAL_NUMBER_ATTRS = [
"probe_serial_number",
"custom_probe_name",
"headstage_serial_number",
"bsc_serial_number",
"bs_serial_number",
]
# Parse arguments
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <source_session_folder> <destination_session_folder>")
sys.exit(1)
src_session = Path(sys.argv[1])
dst_session = Path(sys.argv[2])
dst_session.mkdir(parents=True, exist_ok=True)
# Find all Record Node folders (they contain settings.xml)
record_nodes = [p.parent for p in sorted(src_session.glob("*/settings.xml"))]
if not record_nodes:
print(f"No Record Node folders found in {src_session}")
sys.exit(1)
for src_node in record_nodes:
node_name = src_node.name
dst_node = dst_session / node_name
dst_node.mkdir(parents=True, exist_ok=True)
print(f"Stubbing {node_name}...")
# Anonymize settings.xml (and settings_N.xml for extra experiments)
for settings_file in src_node.glob("settings*.xml"):
tree = ET.parse(str(settings_file))
root = tree.getroot()
machine = root.find("INFO/MACHINE")
if machine is not None:
machine.set("name", "ANONYMOUS")
machine.set("cpu_model", "Anonymous CPU")
for np_probe in root.iter("NP_PROBE"):
for attr in SERIAL_NUMBER_ATTRS:
if attr in np_probe.attrib:
np_probe.set(attr, "0")
tree.write(str(dst_node / settings_file.name), encoding="unicode", xml_declaration=True)
# Process each experiment/recording
for oebin_file in sorted(src_node.rglob("structure.oebin")):
src_rec = oebin_file.parent
rel = src_rec.relative_to(src_node)
dst_rec = dst_node / rel
dst_rec.mkdir(parents=True, exist_ok=True)
# Copy structure.oebin as-is (no sensitive data)
shutil.copy2(oebin_file, dst_rec / "structure.oebin")
# Copy sync_messages.txt as-is
if (src_rec / "sync_messages.txt").exists():
shutil.copy2(src_rec / "sync_messages.txt", dst_rec / "sync_messages.txt")
# Stub continuous streams (read oebin to discover streams and channel counts)
with open(oebin_file) as f:
oebin = json.load(f)
for stream_info in oebin.get("continuous", []):
folder_name = stream_info["folder_name"].rstrip("/")
n_channels = stream_info["num_channels"]
src_stream = src_rec / "continuous" / folder_name
if not src_stream.exists():
raise FileNotFoundError(
f"oebin lists stream '{folder_name}' but {src_stream} does not exist"
)
dst_stream = dst_rec / "continuous" / folder_name
dst_stream.mkdir(parents=True, exist_ok=True)
# Binary data: first SAMPLES
bytes_per_sample = n_channels * 2 # int16
with open(src_stream / "continuous.dat", "rb") as f:
data = f.read(SAMPLES * bytes_per_sample)
with open(dst_stream / "continuous.dat", "wb") as f:
f.write(data)
# Numpy files: first SAMPLES
for fname in ["sample_numbers.npy", "timestamps.npy"]:
arr = np.load(src_stream / fname)[:SAMPLES]
np.save(dst_stream / fname, arr)
# Stub events (empty arrays preserving dtype)
if (src_rec / "events").exists():
for events_dir in (src_rec / "events").iterdir():
if not events_dir.is_dir():
continue
for sub in events_dir.iterdir():
if sub.is_dir():
dst_sub = dst_rec / "events" / events_dir.name / sub.name
dst_sub.mkdir(parents=True, exist_ok=True)
for npy_file in sub.glob("*.npy"):
orig = np.load(npy_file)
np.save(dst_sub / npy_file.name, np.array([], dtype=orig.dtype))
elif sub.suffix == ".npy":
dst_evt = dst_rec / "events" / events_dir.name
dst_evt.mkdir(parents=True, exist_ok=True)
orig = np.load(sub)
np.save(dst_evt / sub.name, np.array([], dtype=orig.dtype))
print("Done.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment