Skip to content

Instantly share code, notes, and snippets.

@h-mayorquin
Created October 8, 2025 13:50
Show Gist options
  • Select an option

  • Save h-mayorquin/5b0d66dd2352a20e273ca7eae8d3a6cb to your computer and use it in GitHub Desktop.

Select an option

Save h-mayorquin/5b0d66dd2352a20e273ca7eae8d3a6cb to your computer and use it in GitHub Desktop.
stub_blackrock
"""
Script to create stubbed (reduced-size) Blackrock test files.
This creates small test files that preserve:
- All headers (basic + extended)
- A small amount of data (enough to test gap detection)
- The file structure and format
Usage:
python stub_blackrock_files.py <input_dir> <output_dir> --samples <n>
"""
import numpy as np
import struct
from pathlib import Path
import argparse
def stub_nsx_file(input_path, output_path, max_samples=1000):
"""
Create a stubbed NSX file with headers + limited data.
Parameters
----------
input_path : Path
Path to original NSX file
output_path : Path
Path to write stubbed file
max_samples : int
Maximum number of PTP packets to keep
"""
with open(input_path, 'rb') as f:
# Read bytes_in_headers (at offset 10 for v2.2+)
f.seek(10)
bytes_in_headers = struct.unpack('<I', f.read(4))[0]
# Check for PTP format (timestamp resolution = 1,000,000,000)
# Offset: 8 (file_id) + 1 (ver_major) + 1 (ver_minor) + 4 (bytes_in_headers) +
# 16 (label) + 256 (comment) + 4 (period) = 290
f.seek(290)
timestamp_res = struct.unpack('<I', f.read(4))[0]
is_ptp = (timestamp_res == 1_000_000_000)
# Read headers
f.seek(0)
header_data = f.read(bytes_in_headers)
if not is_ptp:
# For non-PTP, just copy headers + some data
data_to_keep = f.read(max_samples * 1000) # Arbitrary size
total_packets = "unknown"
packets_kept = "N/A"
else:
# For PTP: read packets one by one
packets = []
packet_count = 0
original_size = bytes_in_headers
while packet_count < max_samples:
# Read packet header: 1 + 8 + 4 bytes
packet_start = f.read(13)
if len(packet_start) < 13:
break
# Parse num_data_points
num_points = struct.unpack('<I', packet_start[9:13])[0]
# Read data (2 bytes per sample per channel)
# From extended headers, we'd need channel count, but let's just read
# what num_points says
data_bytes = f.read(num_points * 2) # Assuming 1 channel for simplicity
if len(data_bytes) < num_points * 2:
break
packets.append(packet_start + data_bytes)
packet_count += 1
original_size += 13 + len(data_bytes)
# Check total file size
f.seek(0, 2) # Seek to end
total_size = f.tell()
data_to_keep = b''.join(packets)
total_packets = "unknown"
packets_kept = packet_count
# Write stubbed file
with open(output_path, 'wb') as f:
f.write(header_data)
f.write(data_to_keep)
print(f"Created stub: {output_path.name}")
if is_ptp:
print(f" Format: PTP")
print(f" Packets kept: {packets_kept}")
else:
print(f" Format: Standard")
print(f" Stubbed size: {len(header_data) + len(data_to_keep):,} bytes")
def stub_nev_file(input_path, output_path, max_packets=100):
"""
Create a stubbed NEV file with headers + limited events.
Parameters
----------
input_path : Path
Path to original NEV file
output_path : Path
Path to write stubbed file
max_packets : int
Maximum number of event packets to keep
"""
with open(input_path, 'rb') as f:
# Check file ID to determine format
file_id = f.read(8)
# Try standard NEURALEV format first (offsets 10, 20)
# Then try BREVENTS format (offsets 12, 16)
if file_id == b'NEURALEV':
header_offset = 10
packet_offset = 20
elif file_id == b'BREVENTS':
header_offset = 12
packet_offset = 16
else:
raise ValueError(f"Unknown NEV file ID: {file_id}")
# Read bytes_in_headers
f.seek(header_offset)
bytes_in_headers = struct.unpack('<I', f.read(4))[0]
# Read packet size
f.seek(packet_offset)
bytes_in_data_packets = struct.unpack('<I', f.read(4))[0]
# Read headers
f.seek(0)
headers = f.read(bytes_in_headers)
# Read limited data packets
packets_data = f.read(max_packets * bytes_in_data_packets)
# Write stubbed file
with open(output_path, 'wb') as f:
f.write(headers)
f.write(packets_data)
print(f"Created stub: {output_path.name}")
print(f" Format: {file_id.decode('ascii')}")
print(f" Headers: {bytes_in_headers:,} bytes")
print(f" Packet size: {bytes_in_data_packets} bytes")
print(f" Packets kept: {max_packets}")
def main():
parser = argparse.ArgumentParser(description='Create stubbed Blackrock test files')
parser.add_argument('input_dir', type=Path, help='Input directory with Blackrock files')
parser.add_argument('output_dir', type=Path, help='Output directory for stubbed files')
parser.add_argument('--samples', type=int, default=1000, help='Max samples per NSX file')
parser.add_argument('--packets', type=int, default=100, help='Max packets for NEV file')
parser.add_argument('--basename', type=str, help='Base filename (without extension)')
args = parser.parse_args()
# Create output directory
args.output_dir.mkdir(parents=True, exist_ok=True)
# Find all Blackrock files
input_files = list(args.input_dir.glob("*.ns*")) + list(args.input_dir.glob("*.nev"))
if args.basename:
input_files = [f for f in input_files if f.stem.startswith(args.basename)]
for input_file in input_files:
output_file = args.output_dir / input_file.name
try:
if input_file.suffix == '.nev':
stub_nev_file(input_file, output_file, args.packets)
elif input_file.suffix.startswith('.ns'):
stub_nsx_file(input_file, output_file, args.samples)
else:
print(f"Skipping unknown file type: {input_file}")
except Exception as e:
print(f"Error processing {input_file.name}: {e}")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment