Created
May 3, 2025 00:45
-
-
Save jdboachie/64d11dc4a55f5faccddff44f53c4d7f0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gi # type:ignore | |
import os | |
import csv | |
import time | |
import glob | |
import logging | |
import fractions | |
import subprocess | |
import tkinter as tk | |
import matplotlib.pyplot as plt # type:ignore | |
from tkinter import filedialog | |
from datetime import datetime | |
from constants import ( | |
KLV_PACKET_SIZE, | |
OUT_FOLDER, | |
CSV_FOLDER, | |
KLV_FOLDER, | |
CSV_ENCODING, | |
EARTH_MEAN_RADIUS, | |
VIDEO_FRAMES_FOLDER, | |
UASLocalMetadataSet, | |
) | |
from math import tan, radians, degrees, cos, pi, sin | |
from klv_data.common import ( # type:ignore | |
datetime_to_bytes, | |
int_to_bytes, | |
) | |
from klv_data.misb0601 import ( # type:ignore | |
PlatformHeadingAngle, | |
PlatformPitchAngle, | |
SlantRange, | |
PlatformRollAngle, | |
SensorLatitude, | |
FrameCenterElevation, | |
SensorLongitude, | |
SensorTrueAltitude, | |
TargetWidth, | |
SensorHorizontalFieldOfView, | |
SensorRelativeElevationAngle, | |
SensorEllipsoidHeightConversion, | |
PlatformRollAngleFull, | |
PlatformPitchAngleFull, | |
SensorRelativeAzimuthAngle, | |
SensorVerticalFieldOfView, | |
PrecisionTimeStamp, | |
SensorRelativeRollAngle, | |
FrameCenterLatitude, | |
Checksum, | |
FrameCenterLongitude, | |
CornerLatitudePoint1Full, | |
CornerLongitudePoint1Full, | |
CornerLatitudePoint2Full, | |
CornerLongitudePoint2Full, | |
CornerLatitudePoint3Full, | |
CornerLongitudePoint3Full, | |
CornerLatitudePoint4Full, | |
CornerLongitudePoint4Full, | |
) | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst, GObject # type:ignore | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname).5s\t%(filename)s\t%(lineno)d\t%(message)s", | |
datefmt="%H:%M:%S", | |
) | |
class Multiplexor: | |
def __init__( | |
self, | |
log_path: str, | |
video_path: str, | |
_verbose: bool = False | |
): | |
if not os.path.exists(log_path): | |
raise FileNotFoundError("Log file does not exist") | |
if not os.path.exists(video_path): | |
raise FileNotFoundError("Video file does not exist") | |
self._verbose = _verbose | |
self.log_path = log_path | |
self.video_path = video_path | |
self._video_frames_dir = VIDEO_FRAMES_FOLDER | |
self.bitrate = self.get_bitrate() | |
self.frame_rate = self.get_frame_rate() # fps | |
self.video_size = self.get_video_size() | |
self.is_push_buffer_allowed = None | |
GObject.threads_init() | |
Gst.init(None) | |
self.cap = None # seems to be unused | |
klv_file = self.generate_klv() | |
self.fh = open(klv_file, 'rb') | |
self.vid_frame_counter = 1 | |
self.klv_frame_counter = 0 | |
self.inject_klv = 0 # Doesn't look like this is used | |
self.duration = 1.0 / self.frame_rate * Gst.SECOND | |
self.dts = 0 # looks unused | |
self.fps = int(self.frame_rate) | |
# Create GStreamer Pipeline | |
self.createGstPipeline() | |
def createGstPipeline(self): | |
# video source elements | |
self.vsrc = Gst.ElementFactory.make("appsrc", "vidsrc") | |
self.vqueue = Gst.ElementFactory.make("queue") | |
self.vtee = Gst.ElementFactory.make("tee") | |
# klv source elements | |
self.appsrc = Gst.ElementFactory.make("appsrc") | |
self.queue_klv = Gst.ElementFactory.make("queue") | |
# recording elements | |
self.queue_record = Gst.ElementFactory.make("queue") | |
self.vcvt_encoder = Gst.ElementFactory.make("videoconvert") | |
# encode | |
self.encoder = Gst.ElementFactory.make("x264enc") | |
self.muxer = Gst.ElementFactory.make("mpegtsmux") | |
self.filesink = Gst.ElementFactory.make("filesink") | |
# configure video element | |
self.caps_str = "video/x-raw" | |
self.caps_str += ",format=(string)RGB,width={},height={}".format(self.width, self.height) | |
self.caps_str += ",framerate={}/1".format(int(self.frame_rate)) | |
self.vcaps = Gst.Caps.from_string(self.caps_str) | |
self.vsrc.set_property("caps", self.vcaps) | |
self.vsrc.set_property("format", Gst.Format.TIME) | |
#self.vsrc.set_property("is-live", True) | |
self.vsrc.connect("need-data", self.video_need_data) | |
self.vsrc.connect("enough-data", self.video_enough_data) | |
# configure appsrc element | |
self.caps_str = "meta/x-klv" | |
self.caps_str += ",parsed=True" | |
self.caps = Gst.Caps.from_string(self.caps_str) | |
self.appsrc.set_property("caps", self.caps) | |
self.appsrc.connect("need-data", self.klv_need_data) | |
self.appsrc.connect("enough-data", self.klv_enough_data) | |
self.appsrc.set_property("format", Gst.Format.TIME) | |
#self.appsrc.set_property("is-live", True) | |
# configure encoder | |
# self.encoder.set_property("noise-reduction", 1000) | |
self.encoder.set_property("threads", 4) | |
self.encoder.set_property("bitrate", self.bitrate) | |
self.encoder.set_property("byte-stream", True) | |
# configure filesink | |
out_file = os.path.join(OUT_FOLDER + os.path.basename(self.video_path)) | |
self.filesink.set_property("location", out_file) | |
self.filesink.set_property("async", 0) | |
self.pipeline = Gst.Pipeline() | |
self.pipeline.add(self.vsrc) | |
self.pipeline.add(self.vqueue) | |
self.pipeline.add(self.vtee) | |
self.pipeline.add(self.appsrc) | |
self.pipeline.add(self.queue_klv) | |
self.pipeline.add(self.queue_record) | |
self.pipeline.add(self.vcvt_encoder) | |
self.pipeline.add(self.encoder) | |
self.pipeline.add(self.muxer) | |
self.pipeline.add(self.filesink) | |
# link video elements | |
self.vsrc.link(self.vqueue) | |
self.vqueue.link(self.vtee) | |
# link recording elements | |
self.vtee.link(self.queue_record) | |
self.queue_record.link(self.vcvt_encoder) | |
self.vcvt_encoder.link(self.encoder) | |
self.encoder.link(self.muxer) | |
self.muxer.link(self.filesink) | |
# link klv elements | |
self.appsrc.link(self.queue_klv) | |
self.queue_klv.link(self.muxer) | |
def klv_need_data(self, src, length): | |
print('======================> KLV need data length: %s' % length) | |
#if self.inject_klv >= vid_frame_rate or self.vid_frame_counter==1: | |
# KLV Data | |
klv_bytes = self.fh.read(KLV_PACKET_SIZE) | |
#print(klv_bytes) | |
if(klv_bytes==b''): | |
print("End klv stream") | |
self.appsrc.emit("end-of-stream") | |
klvbuf = Gst.Buffer.new_allocate(None, KLV_PACKET_SIZE, None) | |
klvbuf.fill(0, klv_bytes) | |
klvbuf.duration = Gst.SECOND | |
klvbuf.pts = klvbuf.dts = self.dts | |
#klvbuf.offset = self.dts | |
retval = self.appsrc.emit("push-buffer", klvbuf) | |
if retval != Gst.FlowReturn.OK: | |
print(retval) | |
self.klv_frame_counter += 1 | |
self.inject_klv = 0 | |
print("klv Frame {}".format(self.klv_frame_counter)) | |
def klv_enough_data(self, src): | |
print('======================> KLV enough data src: %s' % src) | |
def video_need_data(self, src, length): | |
#print('======================> Video need data length: %s' % length) | |
#print('======================> Video need data src: %s' % src) | |
# Get Image | |
##### TEMPORARY FIX ####### | |
img_folder = '.mux/img/' | |
########################### | |
tmp_image =img_folder + str(self.vid_frame_counter)+ ".jpg" | |
# print(tmp_image) | |
if os.path.isfile(tmp_image): | |
vid_frame = plt.imread(tmp_image) | |
data = vid_frame.tostring() | |
vidbuf = Gst.Buffer.new_allocate(None, len(data), None) | |
vidbuf.fill(0, data) | |
#print("Duration video frame {}".format(self.duration/ 1000.0)) | |
#timestamp = (self.vid_frame_counter - 1) * self.duration | |
#print("TimeStamp video frame {}".format(timestamp/ 1000.0)) | |
vidbuf.duration = self.duration | |
vidbuf.pts = vidbuf.dts = self.dts | |
#vidbuf.offset = self.dts | |
retval = self.vsrc.emit("push-buffer", vidbuf) | |
print("video frame {}".format(self.vid_frame_counter)) | |
if retval != Gst.FlowReturn.OK: | |
print(retval) | |
self.vid_frame_counter += 1 | |
self.inject_klv +=1 | |
# Up dts | |
self.dts = self.dts + self.duration | |
else: | |
print("End Video and KLV Stream") | |
# Video Stream | |
self.vsrc.emit("end-of-stream") | |
# KLV Stream | |
self.appsrc.emit("end-of-stream") | |
def video_enough_data(self, src: str) -> None: | |
print('======================> Video enough data src: %s' % src) | |
def play(self) -> None: | |
ret = self.pipeline.set_state(Gst.State.PLAYING) | |
if ret == Gst.StateChangeReturn.FAILURE: | |
raise Exception("Unable to set the pipeline to the playing state") | |
self.bus = self.pipeline.get_bus() | |
while True: | |
msg = self.bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE) | |
t = msg.type | |
if t == Gst.MessageType.EOS: | |
print("EOS") | |
break | |
self.pipeline.set_state(Gst.State.NULL) | |
elif t == Gst.MessageType.ERROR: | |
err, debug = msg.parse_error() | |
print("Error: %s" % err, debug) | |
break | |
elif t == Gst.MessageType.WARNING: | |
err, debug = msg.parse_warning() | |
print("Warning: %s" % err, debug) | |
elif t == Gst.MessageType.STATE_CHANGED: | |
pass | |
elif t == Gst.MessageType.STREAM_STATUS: | |
pass | |
elif t in (Gst.MessageType.LATENCY, Gst.MessageType.NEW_CLOCK): | |
print("Warning: %s" % msg.src) | |
else: | |
pass | |
print("Unknown message: %s" % msg.src, msg.type) | |
self.pipeline.set_state(Gst.State.NULL) | |
logging.info("Wohooo!MISB Created") | |
def extract_video_frames(self) -> bool: | |
f = self.video_path.split("/")[-1].split(".")[0] | |
output_dir = os.path.join(self._video_frames_dir, f) | |
print(output_dir) | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
command = [ | |
"ffmpeg", | |
"-i", | |
self.video_path, | |
"-r", | |
"30/1", | |
os.path.join(output_dir, "%d.jpg"), | |
] | |
res = subprocess.run(command, capture_output=True, text=True) | |
if self._verbose: | |
print(res.stdout, res.stderr) | |
return res.returncode == 0 # True if successful, False otherwise | |
def get_video_size(self) -> tuple[int, int]: | |
command = [ | |
"ffprobe", | |
"-v", | |
"error", | |
"-select_streams", | |
"v:0", | |
"-show_entries", | |
"stream=width,height", | |
"-of", | |
"csv=s=x:p=0", | |
self.video_path, | |
] | |
res = ( | |
subprocess.run(command, shell=True, capture_output=True, text=True) | |
.stdout.strip() | |
.split("x") | |
) | |
self.width = int(res[0]) | |
self.height = int(res[1]) | |
return (self.width, self.height) | |
def get_bitrate(self) -> float: | |
command = [ | |
"ffprobe", | |
"-v", | |
"error", | |
"-select_streams", | |
"v:0", | |
"-show_entries", | |
"stream=bit_rate", | |
"-of", | |
"csv=p=0", | |
self.video_path, | |
] | |
res = subprocess.run(command, capture_output=True, text=True) | |
if res.returncode == 0: | |
self.bitrate = float(res.stdout.strip()) / 1000 | |
else: | |
raise Exception("Unable to obtain bitrate") | |
return self.bitrate | |
def get_frame_rate(self) -> float: | |
command = [ | |
"ffprobe", | |
"-v", | |
"error", | |
"-select_streams", | |
"v", | |
"-of", | |
"default=noprint_wrappers=1:nokey=1", | |
"-show_entries", | |
"stream=r_frame_rate", | |
self.video_path, | |
] | |
res = subprocess.run(command, capture_output=True, text=True) | |
if res.returncode == 0: | |
self.frame_rate = float(fractions.Fraction(res.stdout.strip())) | |
else: | |
raise Exception("Cannot find video frame rate") | |
return self.frame_rate | |
def write_to_klv(self, out_record: str) -> None: | |
logging.info('◌ Writing to klv') | |
HFOV = 81 | |
VFOV = 66 | |
root = os.getcwd() | |
dir = os.path.join(root, out_record) | |
if not os.path.exists(dir): | |
os.makedirs(dir) | |
d = {} | |
count = 0 | |
with open(out_record, encoding=CSV_ENCODING) as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
date_start = datetime.strptime( | |
row["CUSTOM.updateTime"], "%Y/%m/%d %H:%M:%S.%f" | |
) | |
break | |
with open(out_record, encoding=CSV_ENCODING) as csvfile: | |
reader = csv.DictReader(csvfile) | |
for row in reader: | |
for k in row: | |
stripK = k.strip() | |
stripV = row[k].strip() | |
d[stripK] = stripV | |
# We create the klv file for every moment | |
bufferData = b"" | |
# count = 0 | |
for k, v in d.items(): | |
try: | |
if k == "CUSTOM.updateTime": | |
# Handle times that don't have milliseconds | |
try: | |
date_end = datetime.strptime(v, "%Y/%m/%d %H:%M:%S.%f") | |
except Exception: | |
date_end = datetime.strptime(v, "%Y/%m/%d %H:%M:%S") | |
_bytes = bytes( | |
PrecisionTimeStamp(datetime_to_bytes(date_end)) | |
) | |
bufferData += _bytes | |
if k == "OSD.yaw": | |
OSD_yaw = float(v) | |
if OSD_yaw < 0: | |
OSD_yaw = OSD_yaw + 360 | |
_bytes = bytes(PlatformHeadingAngle(OSD_yaw)) | |
bufferData += _bytes | |
if k == "OSD.pitch": | |
OSD_pitch = float(v) | |
_bytes = bytes(PlatformPitchAngle(OSD_pitch)) | |
bufferData += _bytes | |
if k == "OSD.roll": | |
OSD_roll = float(v) | |
_bytes = bytes(PlatformRollAngle(OSD_roll)) | |
bufferData += _bytes | |
if k == "OSD.latitude": | |
OSD_latitude = float(v) | |
_bytes = bytes(SensorLatitude(OSD_latitude)) | |
bufferData += _bytes | |
if k == "OSD.longitude": | |
OSD_longitude = float(v) | |
_bytes = bytes(SensorLongitude(OSD_longitude)) | |
bufferData += _bytes | |
if k == "OSD.altitude [m]": | |
OSD_altitude = float(v) | |
_bytes = bytes(SensorTrueAltitude(OSD_altitude)) | |
bufferData += _bytes | |
if k == "OSD.height [m]": | |
OSD_height = float(v) | |
_bytes = bytes(SensorEllipsoidHeightConversion(OSD_height)) | |
bufferData += _bytes | |
if k == "GIMBAL.yaw": | |
# GIMBAL_yaw = float(v) | |
GIMBAL_yaw = 0.0 | |
_bytes = bytes(SensorRelativeAzimuthAngle(GIMBAL_yaw)) | |
bufferData += _bytes | |
if k == "GIMBAL.pitch": | |
GIMBAL_pitch = float(v) | |
_bytes = bytes(SensorRelativeElevationAngle(GIMBAL_pitch)) | |
bufferData += _bytes | |
if k == "GIMBAL.roll": | |
GIMBAL_roll = float(v) | |
_bytes = bytes(SensorRelativeRollAngle(GIMBAL_roll)) | |
bufferData += _bytes | |
except Exception as e: | |
count += 1 | |
# logging.error(f'Error at line {e.__traceback__.tb_lineno}') # type:ignore | |
continue | |
try: | |
td = date_end - date_start # td: Time Difference | |
n = self.log_path.split("/")[-1].split(".")[0] | |
end_path = os.path.join(root, KLV_FOLDER, n) + "\\%.2f.klv" % ( | |
round(td.total_seconds(), 1) | |
) | |
if not os.path.exists(os.path.join(root, KLV_FOLDER, n)): | |
os.makedirs(os.path.join(root, KLV_FOLDER, n)) | |
# CheckSum | |
v = abs(hash(end_path)) % (10**4) | |
_bytes = bytes(Checksum(v)) | |
bufferData += _bytes | |
# Sensor Horizontal Field of View | |
v = HFOV | |
_bytes = bytes(SensorHorizontalFieldOfView(float(v))) | |
bufferData += _bytes | |
# Sensor Vertical Field of View | |
v = VFOV | |
_bytes = bytes(SensorVerticalFieldOfView(float(v))) | |
bufferData += _bytes | |
# TODO : Check these calculations | |
# Slant Range | |
angle = 180 + (OSD_pitch + GIMBAL_pitch) | |
slantRange = abs(OSD_altitude / (cos(radians(angle)))) | |
_bytes = bytes(SlantRange(slantRange)) | |
bufferData += _bytes | |
# Target Width | |
targetWidth = 2.0 * slantRange * tan(radians(HFOV / 2.0)) | |
try: | |
_bytes = bytes(TargetWidth(targetWidth)) | |
except Exception: | |
_bytes = bytes(TargetWidth(0.0)) | |
bufferData += _bytes | |
# Frame Center Latitude | |
angle = 90 + (OSD_pitch + GIMBAL_pitch) | |
tgHzDist = OSD_altitude * tan(radians(angle)) | |
dy = tgHzDist * cos(radians(OSD_yaw)) | |
framecenterlatitude = OSD_latitude + degrees( | |
(dy / EARTH_MEAN_RADIUS) | |
) | |
_bytes = bytes(FrameCenterLatitude(framecenterlatitude)) | |
bufferData += _bytes | |
# Frame Center Longitude | |
dx = tgHzDist * sin(radians(OSD_yaw)) | |
framecenterlongitude = OSD_longitude + degrees( | |
(dx / EARTH_MEAN_RADIUS) | |
) / cos(radians(OSD_latitude)) | |
_bytes = bytes(FrameCenterLongitude(framecenterlongitude)) | |
bufferData += _bytes | |
# Frame Center Elevation | |
frameCenterElevation = 0.0 | |
_bytes = bytes(FrameCenterElevation(frameCenterElevation)) | |
bufferData += _bytes | |
# CALCULATE CORNERS COORDINATES | |
# FIXME : If we add this values, the klv parse has a overflow | |
# Probably the packets is not created correctly | |
sensor = (OSD_longitude, OSD_latitude, OSD_altitude) | |
frameCenter = (framecenterlongitude, framecenterlatitude, frameCenterElevation) | |
FOV = (VFOV, HFOV) | |
others = (OSD_yaw, GIMBAL_yaw, targetWidth, slantRange) | |
# cornerPointUL, cornerPointUR, cornerPointLR, cornerPointLL = CornerEstimationWithoutOffsets(sensor=sensor, frameCenter=frameCenter, FOV=FOV, others=others) | |
# # Corner Latitude Point 1 (Full) CornerLatitudePoint1Full | |
# _bytes = bytes(CornerLatitudePoint1Full(cornerPointUL[0])) | |
# bufferData += _bytes | |
# # Corner Longitude Point 1 (Full) | |
# _bytes = bytes(CornerLongitudePoint1Full(cornerPointUL[1])) | |
# bufferData += _bytes | |
# # Corner Latitude Point 2 (Full) | |
# _bytes = bytes(CornerLatitudePoint2Full(cornerPointUR[0])) | |
# bufferData += _bytes | |
# # Corner Longitude Point 2 (Full) | |
# _bytes = bytes(CornerLongitudePoint2Full(cornerPointUR[1])) | |
# bufferData += _bytes | |
# # Corner Latitude Point 3 (Full) | |
# _bytes = bytes(CornerLatitudePoint3Full(cornerPointLR[0])) | |
# bufferData += _bytes | |
# # Corner Longitude Point 3 (Full) | |
# _bytes = bytes(CornerLongitudePoint3Full(cornerPointLR[1])) | |
# bufferData += _bytes | |
# # Corner Latitude Point 4 (Full) | |
# _bytes = bytes(CornerLatitudePoint4Full(cornerPointLL[0])) | |
# bufferData += _bytes | |
# # Corner Longitude Point 4 (Full) | |
# _bytes = bytes(CornerLongitudePoint4Full(cornerPointLL[1])) | |
bufferData += _bytes | |
_bytes = bytes(PlatformPitchAngleFull(OSD_pitch)) | |
bufferData += _bytes | |
_bytes = bytes(PlatformRollAngleFull(OSD_roll)) | |
bufferData += _bytes | |
# set packet header | |
writeData = UASLocalMetadataSet | |
sizeTotal = len(bufferData) | |
writeData += int_to_bytes(sizeTotal) | |
writeData += bufferData | |
# Write packet | |
f_write = open(end_path, "wb+") | |
f_write.write(writeData) | |
f_write.close() | |
# count += 1 | |
except Exception as e: | |
count += 1 | |
# logging.error(f'Error writing packet: {str(e)} on line {e.__traceback__.tb_lineno} count: {count}') # type:ignore | |
logging.info(f'✓ Wrote to klv with {count} issues') | |
def generate_klv(self) -> str: | |
if not os.path.exists(CSV_FOLDER): | |
os.makedirs(CSV_FOLDER) | |
out_path = ( | |
CSV_FOLDER + "\\" + self.log_path.split("/")[-1].split(".")[0] + ".csv" | |
) | |
cmd = [".\\TxtLogToCSVtool.exe", self.log_path, out_path] | |
subprocess.run(cmd, shell=True) | |
self.write_to_klv(out_path) | |
logging.info('◌ Compiling klv') | |
filename = os.path.splitext(os.path.basename(self.log_path))[0] | |
glob_path = os.path.join( | |
os.getcwd(), KLV_FOLDER, glob.escape(filename), "*.klv" | |
) | |
files = glob.glob(glob_path) | |
out_data = b"" | |
for fn in files: | |
with open(fn, "rb") as fp: | |
out_data += fp.read() | |
with open(f".mux/klv_output/{filename}/all.klv", "wb") as fp: | |
fp.write(out_data) | |
return f".mux/klv_output/{filename}/all.klv" | |
def __str__(self): | |
return ( | |
f"Multiplexor Object:\n" | |
f" Log Path : {self.video_path}\n" | |
f" Video Path : {self.log_path}\n" | |
f" Bitrate : {self.bitrate} kbps\n" | |
f" Frame Rate : {self.frame_rate} fps\n" | |
f" Video Size : {self.video_size[0]}x{self.video_size[1]}\n" | |
) | |
if __name__ == "__main__": | |
root = tk.Tk() | |
root.withdraw() | |
log_path = filedialog.askopenfilename( | |
title="Select Log File", | |
filetypes=[("Text Files", "*.txt")] | |
) | |
video_path = filedialog.askopenfilename(title="Select Video File") | |
mux = Multiplexor( | |
log_path, | |
video_path, | |
_verbose=False, | |
) | |
mux.play() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment