Created
October 8, 2025 13:50
-
-
Save h-mayorquin/5b0d66dd2352a20e273ca7eae8d3a6cb to your computer and use it in GitHub Desktop.
stub_blackrock
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Script to create stubbed (reduced-size) Blackrock test files. | |
| This creates small test files that preserve: | |
| - All headers (basic + extended) | |
| - A small amount of data (enough to test gap detection) | |
| - The file structure and format | |
| Usage: | |
| python stub_blackrock_files.py <input_dir> <output_dir> --samples <n> | |
| """ | |
| import numpy as np | |
| import struct | |
| from pathlib import Path | |
| import argparse | |
| def stub_nsx_file(input_path, output_path, max_samples=1000): | |
| """ | |
| Create a stubbed NSX file with headers + limited data. | |
| Parameters | |
| ---------- | |
| input_path : Path | |
| Path to original NSX file | |
| output_path : Path | |
| Path to write stubbed file | |
| max_samples : int | |
| Maximum number of PTP packets to keep | |
| """ | |
| with open(input_path, 'rb') as f: | |
| # Read bytes_in_headers (at offset 10 for v2.2+) | |
| f.seek(10) | |
| bytes_in_headers = struct.unpack('<I', f.read(4))[0] | |
| # Check for PTP format (timestamp resolution = 1,000,000,000) | |
| # Offset: 8 (file_id) + 1 (ver_major) + 1 (ver_minor) + 4 (bytes_in_headers) + | |
| # 16 (label) + 256 (comment) + 4 (period) = 290 | |
| f.seek(290) | |
| timestamp_res = struct.unpack('<I', f.read(4))[0] | |
| is_ptp = (timestamp_res == 1_000_000_000) | |
| # Read headers | |
| f.seek(0) | |
| header_data = f.read(bytes_in_headers) | |
| if not is_ptp: | |
| # For non-PTP, just copy headers + some data | |
| data_to_keep = f.read(max_samples * 1000) # Arbitrary size | |
| total_packets = "unknown" | |
| packets_kept = "N/A" | |
| else: | |
| # For PTP: read packets one by one | |
| packets = [] | |
| packet_count = 0 | |
| original_size = bytes_in_headers | |
| while packet_count < max_samples: | |
| # Read packet header: 1 + 8 + 4 bytes | |
| packet_start = f.read(13) | |
| if len(packet_start) < 13: | |
| break | |
| # Parse num_data_points | |
| num_points = struct.unpack('<I', packet_start[9:13])[0] | |
| # Read data (2 bytes per sample per channel) | |
| # From extended headers, we'd need channel count, but let's just read | |
| # what num_points says | |
| data_bytes = f.read(num_points * 2) # Assuming 1 channel for simplicity | |
| if len(data_bytes) < num_points * 2: | |
| break | |
| packets.append(packet_start + data_bytes) | |
| packet_count += 1 | |
| original_size += 13 + len(data_bytes) | |
| # Check total file size | |
| f.seek(0, 2) # Seek to end | |
| total_size = f.tell() | |
| data_to_keep = b''.join(packets) | |
| total_packets = "unknown" | |
| packets_kept = packet_count | |
| # Write stubbed file | |
| with open(output_path, 'wb') as f: | |
| f.write(header_data) | |
| f.write(data_to_keep) | |
| print(f"Created stub: {output_path.name}") | |
| if is_ptp: | |
| print(f" Format: PTP") | |
| print(f" Packets kept: {packets_kept}") | |
| else: | |
| print(f" Format: Standard") | |
| print(f" Stubbed size: {len(header_data) + len(data_to_keep):,} bytes") | |
| def stub_nev_file(input_path, output_path, max_packets=100): | |
| """ | |
| Create a stubbed NEV file with headers + limited events. | |
| Parameters | |
| ---------- | |
| input_path : Path | |
| Path to original NEV file | |
| output_path : Path | |
| Path to write stubbed file | |
| max_packets : int | |
| Maximum number of event packets to keep | |
| """ | |
| with open(input_path, 'rb') as f: | |
| # Check file ID to determine format | |
| file_id = f.read(8) | |
| # Try standard NEURALEV format first (offsets 10, 20) | |
| # Then try BREVENTS format (offsets 12, 16) | |
| if file_id == b'NEURALEV': | |
| header_offset = 10 | |
| packet_offset = 20 | |
| elif file_id == b'BREVENTS': | |
| header_offset = 12 | |
| packet_offset = 16 | |
| else: | |
| raise ValueError(f"Unknown NEV file ID: {file_id}") | |
| # Read bytes_in_headers | |
| f.seek(header_offset) | |
| bytes_in_headers = struct.unpack('<I', f.read(4))[0] | |
| # Read packet size | |
| f.seek(packet_offset) | |
| bytes_in_data_packets = struct.unpack('<I', f.read(4))[0] | |
| # Read headers | |
| f.seek(0) | |
| headers = f.read(bytes_in_headers) | |
| # Read limited data packets | |
| packets_data = f.read(max_packets * bytes_in_data_packets) | |
| # Write stubbed file | |
| with open(output_path, 'wb') as f: | |
| f.write(headers) | |
| f.write(packets_data) | |
| print(f"Created stub: {output_path.name}") | |
| print(f" Format: {file_id.decode('ascii')}") | |
| print(f" Headers: {bytes_in_headers:,} bytes") | |
| print(f" Packet size: {bytes_in_data_packets} bytes") | |
| print(f" Packets kept: {max_packets}") | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Create stubbed Blackrock test files') | |
| parser.add_argument('input_dir', type=Path, help='Input directory with Blackrock files') | |
| parser.add_argument('output_dir', type=Path, help='Output directory for stubbed files') | |
| parser.add_argument('--samples', type=int, default=1000, help='Max samples per NSX file') | |
| parser.add_argument('--packets', type=int, default=100, help='Max packets for NEV file') | |
| parser.add_argument('--basename', type=str, help='Base filename (without extension)') | |
| args = parser.parse_args() | |
| # Create output directory | |
| args.output_dir.mkdir(parents=True, exist_ok=True) | |
| # Find all Blackrock files | |
| input_files = list(args.input_dir.glob("*.ns*")) + list(args.input_dir.glob("*.nev")) | |
| if args.basename: | |
| input_files = [f for f in input_files if f.stem.startswith(args.basename)] | |
| for input_file in input_files: | |
| output_file = args.output_dir / input_file.name | |
| try: | |
| if input_file.suffix == '.nev': | |
| stub_nev_file(input_file, output_file, args.packets) | |
| elif input_file.suffix.startswith('.ns'): | |
| stub_nsx_file(input_file, output_file, args.samples) | |
| else: | |
| print(f"Skipping unknown file type: {input_file}") | |
| except Exception as e: | |
| print(f"Error processing {input_file.name}: {e}") | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment