Last active
January 18, 2016 11:07
-
-
Save lucainnocenti/8df2f5b2f5134671b6a1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import struct | |
import os | |
import time | |
start_time = time.time() | |
# ============ CONSTANTS TO (OPTIONALLY) SET ============= | |
WINDOW_SIZE = 100 | |
INPUT_FILE_NAME = 'data_lab\\timetags5byte.bin' | |
# OUTPUT_FILE_NAME = 'output.txt' | |
# ======== CODE TO READ AND PARSE 5 BYTE BINARY FILE ========================= | |
# process data in INPUT_FILE_NAME and print results in OUTPUT_FILE_NAME | |
with open(INPUT_FILE_NAME, 'rb') as input_file: | |
# open(OUTPUT_FILE_NAME, 'w') as output_file: | |
# number of bytes found in every file produced when writing the binary file | |
initial_offset = 40 | |
# number of bytes occupied by each timetag | |
timetag_size = 5 | |
# each timetag is stored in 5 bytes. The first 4 bytes contain the | |
# timestamp, with the first byte being the least significant one, and last | |
# byte the most significant one | |
def print_channels(channels): | |
print('found a fourfold coincidence at ', end = '') | |
for channel in channels: | |
print('{0: >5}'.format(channel), end = '') | |
print('') | |
input_file.seek(initial_offset) | |
output_channels = [] | |
timetag = input_file.read(timetag_size) | |
timetagStruct = struct.Struct('LB') | |
timestamp, channel = timetagStruct.unpack(timetag) | |
# timestamp, channel = struct.unpack('LB', timetag) | |
triggering_timestamp = timestamp | |
channels = [channel] | |
try: | |
while True: | |
timetag = input_file.read(timetag_size) | |
timestamp, channel = timetagStruct.unpack(timetag) | |
# timestamp, channel = struct.unpack('LB', timetag) | |
if 0 < timestamp - triggering_timestamp < WINDOW_SIZE: | |
channels.append(channel) | |
else: | |
if len(channels) == 4: | |
print_channels(channels) | |
output_channels.append(channels) | |
channels = [channel] | |
triggering_timestamp = timestamp | |
except struct.error: | |
pass | |
if len(channels) == 4: | |
print_channels(channels) | |
output_channels.append(channels) | |
print('Finished in '+ str(time.time() - start_time)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment