Last active
June 25, 2024 08:55
-
-
Save vrbadev/ecf8e9dfbc7bc6a93d6c40b7d9548328 to your computer and use it in GitHub Desktop.
Script for extraction of events compressed in RAW file with EVT3 format to a CSV file. This script is in pure python and does not depend on any additional packages (optionally tqdm).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Created on Mon Jun 24 18:16:30 2024 | |
@author: Vojtech Vrba ([email protected]) | |
Script for extraction of events compressed in RAW file with EVT3 format to a CSV file. | |
References: | |
- https://docs.prophesee.ai/stable/data/encoding_formats/evt3.html#chapter-data-encoding-formats-evt3 | |
""" | |
import argparse | |
import os, sys | |
num_from_bits = lambda value, start=0, length=1: sum([(value >> start) & (1 << pos) for pos in range(length)]) | |
class Evt3(object): | |
EVT_ADDR_Y = 0x0 | |
EVT_ADDR_X = 0x2 | |
VECT_BASE_X = 0x3 | |
VECT_12 = 0x4 | |
VECT_8 = 0x5 | |
EVT_TIME_LOW = 0x6 | |
CONTINUED_4 = 0x7 | |
EVT_TIME_HIGH = 0x8 | |
EXT_TRIGGER = 0xA | |
OTHERS = 0xE | |
CONTINUED_12 = 0xF | |
def __init__(self): | |
self.first_time_base = None | |
self.current_time_base = 0 | |
self.current_time = 0 | |
self.current_ev_addr_y = 0 | |
self.current_base_x = 0 | |
self.current_polarity = 0 | |
self.n_time_high_loop = 0 | |
self.max_timestamp_base = ((1 << 12) - 1) << 12 | |
self.time_loop = self.max_timestamp_base + (1 << 12) | |
self.loop_threshold = (10 << 12) | |
def process(self, word): | |
event_type = num_from_bits(word, start=12, length=4) # always last 4 bits | |
evts = list() | |
# skip the events until we find the first time high | |
if self.first_time_base is None: | |
if event_type == Evt3.EVT_TIME_HIGH: | |
time = num_from_bits(word, start=0, length=12) | |
self.current_time_base = time << 12 | |
self.current_time = self.current_time_base | |
self.first_time_base = self.current_time_base | |
else: | |
if event_type == Evt3.EVT_ADDR_X: | |
x = num_from_bits(word, start=0, length=11) | |
pol = num_from_bits(word, start=11, length=1) | |
evts = [(x, self.current_ev_addr_y, pol, self.current_time)] | |
elif event_type == Evt3.VECT_12: | |
evts = [(self.current_base_x+i, self.current_ev_addr_y, self.current_polarity, self.current_time) for i in range(12) if word & (1 << i) != 0] | |
self.current_base_x += 12 | |
elif event_type == Evt3.VECT_8: | |
evts = [(self.current_base_x+i, self.current_ev_addr_y, self.current_polarity, self.current_time) for i in range(8) if word & (1 << i) != 0] | |
self.current_base_x += 8 | |
elif event_type == Evt3.EVT_ADDR_Y: | |
y = num_from_bits(word, start=0, length=11) | |
self.current_ev_addr_y = y | |
# why not this? | |
#pol = num_from_bits(word, start=11, length=1) | |
#self.current_polarity = d["pol"] | |
elif event_type == Evt3.VECT_BASE_X: | |
x = num_from_bits(word, start=0, length=11) | |
pol = num_from_bits(word, start=11, length=1) | |
self.current_base_x = x | |
self.current_polarity = pol | |
elif event_type == Evt3.EVT_TIME_HIGH: | |
time = num_from_bits(word, start=0, length=12) | |
new_time_base = (time << 12) + self.n_time_high_loop * self.time_loop | |
if self.current_time_base > new_time_base and self.current_time_base - new_time_base >= self.max_timestamp_base - self.loop_threshold: | |
new_time_base += self.time_loop | |
self.n_time_high_loop += 1 | |
self.current_time_base = new_time_base | |
self.current_time = self.current_time_base | |
elif event_type == Evt3.EVT_TIME_LOW: | |
time = num_from_bits(word, start=0, length=12) | |
self.current_time = self.current_time_base + time | |
elif event_type == Evt3.EXT_TRIGGER: | |
value = num_from_bits(word, start=0, length=1) | |
channel_id = num_from_bits(word, start=8, length=4) | |
evts = [(-1, channel_id, value, self.current_time)] | |
elif event_type == Evt3.CONTINUED_4: | |
data = num_from_bits(word, start=0, length=4) | |
# unused | |
elif event_type == Evt3.CONTINUED_12: | |
data = num_from_bits(word, start=0, length=12) | |
# unused | |
elif event_type == Evt3.OTHERS: | |
subtype = num_from_bits(word, start=0, length=12) | |
# unused | |
else: | |
raise Exception("Unknown event type: 0x%02X" % event_type) | |
return evts | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-i", "--input", type=str, help="Path to the input RAW file.") | |
parser.add_argument("-o", "--output", type=str, help="Path to the output CSV file.") | |
parser.add_argument("-s", "--pstep", type=int, help="Progress update step (kB). Default: 20.", default=20) | |
parser.add_argument("-p", "--pbar", help="Show progress bar (requires tqdm package).", action="store_true") | |
print("Script for EVT3 events extraction from RAW file recordings into CSV.") | |
args = parser.parse_args() | |
if len(sys.argv) == 1 or args.input is None or args.output is None: | |
parser.print_help() | |
sys.exit(-1) | |
if not args.input.endswith(".raw"): | |
print("Error: The name of the provided input file '%s' does not end with RAW extension!" % args.input, file=sys.stderr) | |
sys.exit(-2) | |
if not os.path.exists(args.input): | |
print("Error: The provided input file '%s' does not exist!" % args.input, file=sys.stderr) | |
sys.exit(-3) | |
if not args.output.endswith(".csv"): | |
print("Error: The name of the provided output file '%s' does not end with CSV extension!" % args.output, file=sys.stderr) | |
sys.exit(-4) | |
if os.path.exists(args.output): | |
print("Error: The provided output file '%s' already exists!" % args.output, file=sys.stderr) | |
sys.exit(-5) | |
raw_file_size = os.stat(args.input).st_size | |
print("Input path: %s (%d bytes)" % (args.input, raw_file_size)) | |
print("Output path: %s" % (args.output)) | |
header_str = "" | |
with open(args.input, "rb") as raw_file: | |
while byte := raw_file.read(1): | |
if byte != b"%": | |
raw_file.seek(-1, 1) | |
break | |
line = (byte + raw_file.readline()).decode() | |
header_str += line | |
if "% end" in line: break | |
print("RAW file header:") | |
print(header_str) | |
print("Starting extraction...") | |
evt3 = Evt3() | |
raw_words_count = (raw_file_size - len(header_str)) // 2 | |
progress_update_step = args.pstep * 500 | |
events_count = 0 | |
if args.pbar: | |
from tqdm import tqdm | |
pbar = tqdm(total=raw_words_count) | |
with open(args.output, "w") as csv_file: | |
csv_file.write("t,x,y,p\n") | |
i = 0 | |
while word := raw_file.read(2): | |
events = evt3.process(int.from_bytes(word, "little")) | |
if len(events) > 0: | |
for e in events: | |
csv_file.write("%d,%d,%d,%d\n" % (e[3], e[0], e[1], e[2])) | |
events_count += len(events) | |
i += 1 | |
if i % progress_update_step == 0 or i == raw_words_count: | |
if args.pbar: | |
pbar.set_description("Events count: %d | Progress" % events_count) | |
pbar.update(i - pbar.n) | |
else: | |
print("\rProgress: %.2f %% | Events count: %d" % (100*i/raw_words_count, events_count), end="") | |
if args.pbar: pbar.close() | |
print("\nScript done.") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment