Last active
January 11, 2024 10:27
-
-
Save alastairtree/7b0b1dcaffac6e46791045a8e683f67f to your computer and use it in GitHub Desktop.
This scripts takes a binary file containing CCSDS packets and splits them into one file per packet. Assumes CCSDS packets always have a secondary header, and that the first 4 bytes of data are a SCLK timestamp in seconds with 2010-01-01 as epoch as per the NASA IMAP Mission spec.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# This scripts takes a binary file containing CCSDS packets and splits them into one file per packet | |
# Usage > python3 PacketSplitter.py -i <InputFile> -o <outputFiles> | |
import datetime | |
import sys, getopt, os | |
from os.path import exists | |
HEADER_LENGTH = 6 | |
SCLK_LENGTH = 4 | |
def main(argv): | |
inputfile = "" | |
outputfile = "" | |
helptext = "Example: python3 PacketSplitter.py -i <InputFile> -o <outputFiles>" | |
# parse arguments from command line | |
try: | |
opts, args = getopt.getopt(argv, "hi:o:", ["ifile=", "ofile="]) | |
except getopt.GetoptError: | |
print(helptext) | |
sys.exit(2) | |
for opt, arg in opts: | |
if opt == "-h": | |
print(helptext) | |
sys.exit() | |
elif opt in ("-i", "--ifile"): | |
inputfile = arg | |
elif opt in ("-o", "--ofile"): | |
outputfile = arg | |
# validate arguments | |
if not exists(inputfile): | |
print(f"Input file {inputfile} not found") | |
sys.exit(3) | |
outfilename, outfile_extension = os.path.splitext(outputfile) | |
# We are ready to proceed - inform user | |
inputFileSize = os.path.getsize(inputfile) | |
print(f"Input file {inputfile} is {inputFileSize} bytes") | |
bytesWritten = 0 | |
packetDone = False | |
packetBytes = bytes() | |
ApId = 0 | |
SeqCnt = 0 | |
sclk = 0 | |
length = 0 | |
packetTime = datetime.datetime.now() | |
packetCount = 0 | |
outputfile1 = "" | |
with open(inputfile, "rb") as inputFileStream: | |
byte = inputFileStream.read(1) | |
while byte: | |
packetBytes += byte | |
byte = inputFileStream.read(1) | |
if len(packetBytes) == HEADER_LENGTH + SCLK_LENGTH: | |
ApId = ((packetBytes[0] << 8) | (packetBytes[1] << 0)) & 0x7FF | |
SeqCnt = ((packetBytes[2] << 8) | (packetBytes[3] << 0)) & 0x3FFF | |
length = int.from_bytes(packetBytes[4:6], byteorder="big") | |
sclk = int.from_bytes(packetBytes[6:10], byteorder="big") | |
packetTime = datetime.datetime( | |
2010, 1, 1, 0, 0, 0, 0, tzinfo=datetime.timezone.utc | |
) + datetime.timedelta(seconds=sclk) | |
outputfile1 = f'{outfilename}_{packetTime.strftime("%Y%m%d")}_{packetTime.strftime("%H%M%S")}_ApId_{hex(ApId)}_Seq_{SeqCnt}{outfile_extension}' | |
print( | |
f'Packet found: APID={hex(ApId)}, SeqCnt={SeqCnt}, Length={length}, SCLK={sclk}, Time={packetTime.strftime("%Y%m%d %H%M%S")}, File={outputfile1}' | |
) | |
if length != 0 and len(packetBytes) == length + 1 + HEADER_LENGTH: | |
packetDone = True | |
if packetDone: | |
if exists(outputfile1): | |
print(f'Output file {outputfile1} already exists! Quitting...') | |
sys.exit(4) | |
with open(outputfile1, "wb") as outputFileStream1: | |
outputFileStream1.write(packetBytes) | |
bytesWritten += len(packetBytes) | |
outputFileStream1.close() | |
packetCount += 1 | |
packetDone = False | |
packetBytes = bytes() | |
ApId = 0 | |
SeqCnt = 0 | |
length = 0 | |
sclk = 0 | |
packetTime = datetime.datetime.now() | |
inputFileStream.close() | |
print( | |
f"{bytesWritten} bytes written across {packetCount} files. Packet Spliiting is complete :-)" | |
) | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment