Skip to content

Instantly share code, notes, and snippets.

@jdboachie
Created May 3, 2025 00:45
Show Gist options
  • Save jdboachie/64d11dc4a55f5faccddff44f53c4d7f0 to your computer and use it in GitHub Desktop.
Save jdboachie/64d11dc4a55f5faccddff44f53c4d7f0 to your computer and use it in GitHub Desktop.
import gi # type:ignore
import os
import csv
import time
import glob
import logging
import fractions
import subprocess
import tkinter as tk
import matplotlib.pyplot as plt # type:ignore
from tkinter import filedialog
from datetime import datetime
from constants import (
KLV_PACKET_SIZE,
OUT_FOLDER,
CSV_FOLDER,
KLV_FOLDER,
CSV_ENCODING,
EARTH_MEAN_RADIUS,
VIDEO_FRAMES_FOLDER,
UASLocalMetadataSet,
)
from math import tan, radians, degrees, cos, pi, sin
from klv_data.common import ( # type:ignore
datetime_to_bytes,
int_to_bytes,
)
from klv_data.misb0601 import ( # type:ignore
PlatformHeadingAngle,
PlatformPitchAngle,
SlantRange,
PlatformRollAngle,
SensorLatitude,
FrameCenterElevation,
SensorLongitude,
SensorTrueAltitude,
TargetWidth,
SensorHorizontalFieldOfView,
SensorRelativeElevationAngle,
SensorEllipsoidHeightConversion,
PlatformRollAngleFull,
PlatformPitchAngleFull,
SensorRelativeAzimuthAngle,
SensorVerticalFieldOfView,
PrecisionTimeStamp,
SensorRelativeRollAngle,
FrameCenterLatitude,
Checksum,
FrameCenterLongitude,
CornerLatitudePoint1Full,
CornerLongitudePoint1Full,
CornerLatitudePoint2Full,
CornerLongitudePoint2Full,
CornerLatitudePoint3Full,
CornerLongitudePoint3Full,
CornerLatitudePoint4Full,
CornerLongitudePoint4Full,
)
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject # type:ignore
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname).5s\t%(filename)s\t%(lineno)d\t%(message)s",
datefmt="%H:%M:%S",
)
class Multiplexor:
def __init__(
self,
log_path: str,
video_path: str,
_verbose: bool = False
):
if not os.path.exists(log_path):
raise FileNotFoundError("Log file does not exist")
if not os.path.exists(video_path):
raise FileNotFoundError("Video file does not exist")
self._verbose = _verbose
self.log_path = log_path
self.video_path = video_path
self._video_frames_dir = VIDEO_FRAMES_FOLDER
self.bitrate = self.get_bitrate()
self.frame_rate = self.get_frame_rate() # fps
self.video_size = self.get_video_size()
self.is_push_buffer_allowed = None
GObject.threads_init()
Gst.init(None)
self.cap = None # seems to be unused
klv_file = self.generate_klv()
self.fh = open(klv_file, 'rb')
self.vid_frame_counter = 1
self.klv_frame_counter = 0
self.inject_klv = 0 # Doesn't look like this is used
self.duration = 1.0 / self.frame_rate * Gst.SECOND
self.dts = 0 # looks unused
self.fps = int(self.frame_rate)
# Create GStreamer Pipeline
self.createGstPipeline()
def createGstPipeline(self):
# video source elements
self.vsrc = Gst.ElementFactory.make("appsrc", "vidsrc")
self.vqueue = Gst.ElementFactory.make("queue")
self.vtee = Gst.ElementFactory.make("tee")
# klv source elements
self.appsrc = Gst.ElementFactory.make("appsrc")
self.queue_klv = Gst.ElementFactory.make("queue")
# recording elements
self.queue_record = Gst.ElementFactory.make("queue")
self.vcvt_encoder = Gst.ElementFactory.make("videoconvert")
# encode
self.encoder = Gst.ElementFactory.make("x264enc")
self.muxer = Gst.ElementFactory.make("mpegtsmux")
self.filesink = Gst.ElementFactory.make("filesink")
# configure video element
self.caps_str = "video/x-raw"
self.caps_str += ",format=(string)RGB,width={},height={}".format(self.width, self.height)
self.caps_str += ",framerate={}/1".format(int(self.frame_rate))
self.vcaps = Gst.Caps.from_string(self.caps_str)
self.vsrc.set_property("caps", self.vcaps)
self.vsrc.set_property("format", Gst.Format.TIME)
#self.vsrc.set_property("is-live", True)
self.vsrc.connect("need-data", self.video_need_data)
self.vsrc.connect("enough-data", self.video_enough_data)
# configure appsrc element
self.caps_str = "meta/x-klv"
self.caps_str += ",parsed=True"
self.caps = Gst.Caps.from_string(self.caps_str)
self.appsrc.set_property("caps", self.caps)
self.appsrc.connect("need-data", self.klv_need_data)
self.appsrc.connect("enough-data", self.klv_enough_data)
self.appsrc.set_property("format", Gst.Format.TIME)
#self.appsrc.set_property("is-live", True)
# configure encoder
# self.encoder.set_property("noise-reduction", 1000)
self.encoder.set_property("threads", 4)
self.encoder.set_property("bitrate", self.bitrate)
self.encoder.set_property("byte-stream", True)
# configure filesink
out_file = os.path.join(OUT_FOLDER + os.path.basename(self.video_path))
self.filesink.set_property("location", out_file)
self.filesink.set_property("async", 0)
self.pipeline = Gst.Pipeline()
self.pipeline.add(self.vsrc)
self.pipeline.add(self.vqueue)
self.pipeline.add(self.vtee)
self.pipeline.add(self.appsrc)
self.pipeline.add(self.queue_klv)
self.pipeline.add(self.queue_record)
self.pipeline.add(self.vcvt_encoder)
self.pipeline.add(self.encoder)
self.pipeline.add(self.muxer)
self.pipeline.add(self.filesink)
# link video elements
self.vsrc.link(self.vqueue)
self.vqueue.link(self.vtee)
# link recording elements
self.vtee.link(self.queue_record)
self.queue_record.link(self.vcvt_encoder)
self.vcvt_encoder.link(self.encoder)
self.encoder.link(self.muxer)
self.muxer.link(self.filesink)
# link klv elements
self.appsrc.link(self.queue_klv)
self.queue_klv.link(self.muxer)
def klv_need_data(self, src, length):
print('======================> KLV need data length: %s' % length)
#if self.inject_klv >= vid_frame_rate or self.vid_frame_counter==1:
# KLV Data
klv_bytes = self.fh.read(KLV_PACKET_SIZE)
#print(klv_bytes)
if(klv_bytes==b''):
print("End klv stream")
self.appsrc.emit("end-of-stream")
klvbuf = Gst.Buffer.new_allocate(None, KLV_PACKET_SIZE, None)
klvbuf.fill(0, klv_bytes)
klvbuf.duration = Gst.SECOND
klvbuf.pts = klvbuf.dts = self.dts
#klvbuf.offset = self.dts
retval = self.appsrc.emit("push-buffer", klvbuf)
if retval != Gst.FlowReturn.OK:
print(retval)
self.klv_frame_counter += 1
self.inject_klv = 0
print("klv Frame {}".format(self.klv_frame_counter))
def klv_enough_data(self, src):
print('======================> KLV enough data src: %s' % src)
def video_need_data(self, src, length):
#print('======================> Video need data length: %s' % length)
#print('======================> Video need data src: %s' % src)
# Get Image
##### TEMPORARY FIX #######
img_folder = '.mux/img/'
###########################
tmp_image =img_folder + str(self.vid_frame_counter)+ ".jpg"
# print(tmp_image)
if os.path.isfile(tmp_image):
vid_frame = plt.imread(tmp_image)
data = vid_frame.tostring()
vidbuf = Gst.Buffer.new_allocate(None, len(data), None)
vidbuf.fill(0, data)
#print("Duration video frame {}".format(self.duration/ 1000.0))
#timestamp = (self.vid_frame_counter - 1) * self.duration
#print("TimeStamp video frame {}".format(timestamp/ 1000.0))
vidbuf.duration = self.duration
vidbuf.pts = vidbuf.dts = self.dts
#vidbuf.offset = self.dts
retval = self.vsrc.emit("push-buffer", vidbuf)
print("video frame {}".format(self.vid_frame_counter))
if retval != Gst.FlowReturn.OK:
print(retval)
self.vid_frame_counter += 1
self.inject_klv +=1
# Up dts
self.dts = self.dts + self.duration
else:
print("End Video and KLV Stream")
# Video Stream
self.vsrc.emit("end-of-stream")
# KLV Stream
self.appsrc.emit("end-of-stream")
def video_enough_data(self, src: str) -> None:
print('======================> Video enough data src: %s' % src)
def play(self) -> None:
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
raise Exception("Unable to set the pipeline to the playing state")
self.bus = self.pipeline.get_bus()
while True:
msg = self.bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE)
t = msg.type
if t == Gst.MessageType.EOS:
print("EOS")
break
self.pipeline.set_state(Gst.State.NULL)
elif t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print("Error: %s" % err, debug)
break
elif t == Gst.MessageType.WARNING:
err, debug = msg.parse_warning()
print("Warning: %s" % err, debug)
elif t == Gst.MessageType.STATE_CHANGED:
pass
elif t == Gst.MessageType.STREAM_STATUS:
pass
elif t in (Gst.MessageType.LATENCY, Gst.MessageType.NEW_CLOCK):
print("Warning: %s" % msg.src)
else:
pass
print("Unknown message: %s" % msg.src, msg.type)
self.pipeline.set_state(Gst.State.NULL)
logging.info("Wohooo!MISB Created")
def extract_video_frames(self) -> bool:
f = self.video_path.split("/")[-1].split(".")[0]
output_dir = os.path.join(self._video_frames_dir, f)
print(output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
command = [
"ffmpeg",
"-i",
self.video_path,
"-r",
"30/1",
os.path.join(output_dir, "%d.jpg"),
]
res = subprocess.run(command, capture_output=True, text=True)
if self._verbose:
print(res.stdout, res.stderr)
return res.returncode == 0 # True if successful, False otherwise
def get_video_size(self) -> tuple[int, int]:
command = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height",
"-of",
"csv=s=x:p=0",
self.video_path,
]
res = (
subprocess.run(command, shell=True, capture_output=True, text=True)
.stdout.strip()
.split("x")
)
self.width = int(res[0])
self.height = int(res[1])
return (self.width, self.height)
def get_bitrate(self) -> float:
command = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=bit_rate",
"-of",
"csv=p=0",
self.video_path,
]
res = subprocess.run(command, capture_output=True, text=True)
if res.returncode == 0:
self.bitrate = float(res.stdout.strip()) / 1000
else:
raise Exception("Unable to obtain bitrate")
return self.bitrate
def get_frame_rate(self) -> float:
command = [
"ffprobe",
"-v",
"error",
"-select_streams",
"v",
"-of",
"default=noprint_wrappers=1:nokey=1",
"-show_entries",
"stream=r_frame_rate",
self.video_path,
]
res = subprocess.run(command, capture_output=True, text=True)
if res.returncode == 0:
self.frame_rate = float(fractions.Fraction(res.stdout.strip()))
else:
raise Exception("Cannot find video frame rate")
return self.frame_rate
def write_to_klv(self, out_record: str) -> None:
logging.info('◌ Writing to klv')
HFOV = 81
VFOV = 66
root = os.getcwd()
dir = os.path.join(root, out_record)
if not os.path.exists(dir):
os.makedirs(dir)
d = {}
count = 0
with open(out_record, encoding=CSV_ENCODING) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
date_start = datetime.strptime(
row["CUSTOM.updateTime"], "%Y/%m/%d %H:%M:%S.%f"
)
break
with open(out_record, encoding=CSV_ENCODING) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
for k in row:
stripK = k.strip()
stripV = row[k].strip()
d[stripK] = stripV
# We create the klv file for every moment
bufferData = b""
# count = 0
for k, v in d.items():
try:
if k == "CUSTOM.updateTime":
# Handle times that don't have milliseconds
try:
date_end = datetime.strptime(v, "%Y/%m/%d %H:%M:%S.%f")
except Exception:
date_end = datetime.strptime(v, "%Y/%m/%d %H:%M:%S")
_bytes = bytes(
PrecisionTimeStamp(datetime_to_bytes(date_end))
)
bufferData += _bytes
if k == "OSD.yaw":
OSD_yaw = float(v)
if OSD_yaw < 0:
OSD_yaw = OSD_yaw + 360
_bytes = bytes(PlatformHeadingAngle(OSD_yaw))
bufferData += _bytes
if k == "OSD.pitch":
OSD_pitch = float(v)
_bytes = bytes(PlatformPitchAngle(OSD_pitch))
bufferData += _bytes
if k == "OSD.roll":
OSD_roll = float(v)
_bytes = bytes(PlatformRollAngle(OSD_roll))
bufferData += _bytes
if k == "OSD.latitude":
OSD_latitude = float(v)
_bytes = bytes(SensorLatitude(OSD_latitude))
bufferData += _bytes
if k == "OSD.longitude":
OSD_longitude = float(v)
_bytes = bytes(SensorLongitude(OSD_longitude))
bufferData += _bytes
if k == "OSD.altitude [m]":
OSD_altitude = float(v)
_bytes = bytes(SensorTrueAltitude(OSD_altitude))
bufferData += _bytes
if k == "OSD.height [m]":
OSD_height = float(v)
_bytes = bytes(SensorEllipsoidHeightConversion(OSD_height))
bufferData += _bytes
if k == "GIMBAL.yaw":
# GIMBAL_yaw = float(v)
GIMBAL_yaw = 0.0
_bytes = bytes(SensorRelativeAzimuthAngle(GIMBAL_yaw))
bufferData += _bytes
if k == "GIMBAL.pitch":
GIMBAL_pitch = float(v)
_bytes = bytes(SensorRelativeElevationAngle(GIMBAL_pitch))
bufferData += _bytes
if k == "GIMBAL.roll":
GIMBAL_roll = float(v)
_bytes = bytes(SensorRelativeRollAngle(GIMBAL_roll))
bufferData += _bytes
except Exception as e:
count += 1
# logging.error(f'Error at line {e.__traceback__.tb_lineno}') # type:ignore
continue
try:
td = date_end - date_start # td: Time Difference
n = self.log_path.split("/")[-1].split(".")[0]
end_path = os.path.join(root, KLV_FOLDER, n) + "\\%.2f.klv" % (
round(td.total_seconds(), 1)
)
if not os.path.exists(os.path.join(root, KLV_FOLDER, n)):
os.makedirs(os.path.join(root, KLV_FOLDER, n))
# CheckSum
v = abs(hash(end_path)) % (10**4)
_bytes = bytes(Checksum(v))
bufferData += _bytes
# Sensor Horizontal Field of View
v = HFOV
_bytes = bytes(SensorHorizontalFieldOfView(float(v)))
bufferData += _bytes
# Sensor Vertical Field of View
v = VFOV
_bytes = bytes(SensorVerticalFieldOfView(float(v)))
bufferData += _bytes
# TODO : Check these calculations
# Slant Range
angle = 180 + (OSD_pitch + GIMBAL_pitch)
slantRange = abs(OSD_altitude / (cos(radians(angle))))
_bytes = bytes(SlantRange(slantRange))
bufferData += _bytes
# Target Width
targetWidth = 2.0 * slantRange * tan(radians(HFOV / 2.0))
try:
_bytes = bytes(TargetWidth(targetWidth))
except Exception:
_bytes = bytes(TargetWidth(0.0))
bufferData += _bytes
# Frame Center Latitude
angle = 90 + (OSD_pitch + GIMBAL_pitch)
tgHzDist = OSD_altitude * tan(radians(angle))
dy = tgHzDist * cos(radians(OSD_yaw))
framecenterlatitude = OSD_latitude + degrees(
(dy / EARTH_MEAN_RADIUS)
)
_bytes = bytes(FrameCenterLatitude(framecenterlatitude))
bufferData += _bytes
# Frame Center Longitude
dx = tgHzDist * sin(radians(OSD_yaw))
framecenterlongitude = OSD_longitude + degrees(
(dx / EARTH_MEAN_RADIUS)
) / cos(radians(OSD_latitude))
_bytes = bytes(FrameCenterLongitude(framecenterlongitude))
bufferData += _bytes
# Frame Center Elevation
frameCenterElevation = 0.0
_bytes = bytes(FrameCenterElevation(frameCenterElevation))
bufferData += _bytes
# CALCULATE CORNERS COORDINATES
# FIXME : If we add this values, the klv parse has a overflow
# Probably the packets is not created correctly
sensor = (OSD_longitude, OSD_latitude, OSD_altitude)
frameCenter = (framecenterlongitude, framecenterlatitude, frameCenterElevation)
FOV = (VFOV, HFOV)
others = (OSD_yaw, GIMBAL_yaw, targetWidth, slantRange)
# cornerPointUL, cornerPointUR, cornerPointLR, cornerPointLL = CornerEstimationWithoutOffsets(sensor=sensor, frameCenter=frameCenter, FOV=FOV, others=others)
# # Corner Latitude Point 1 (Full) CornerLatitudePoint1Full
# _bytes = bytes(CornerLatitudePoint1Full(cornerPointUL[0]))
# bufferData += _bytes
# # Corner Longitude Point 1 (Full)
# _bytes = bytes(CornerLongitudePoint1Full(cornerPointUL[1]))
# bufferData += _bytes
# # Corner Latitude Point 2 (Full)
# _bytes = bytes(CornerLatitudePoint2Full(cornerPointUR[0]))
# bufferData += _bytes
# # Corner Longitude Point 2 (Full)
# _bytes = bytes(CornerLongitudePoint2Full(cornerPointUR[1]))
# bufferData += _bytes
# # Corner Latitude Point 3 (Full)
# _bytes = bytes(CornerLatitudePoint3Full(cornerPointLR[0]))
# bufferData += _bytes
# # Corner Longitude Point 3 (Full)
# _bytes = bytes(CornerLongitudePoint3Full(cornerPointLR[1]))
# bufferData += _bytes
# # Corner Latitude Point 4 (Full)
# _bytes = bytes(CornerLatitudePoint4Full(cornerPointLL[0]))
# bufferData += _bytes
# # Corner Longitude Point 4 (Full)
# _bytes = bytes(CornerLongitudePoint4Full(cornerPointLL[1]))
bufferData += _bytes
_bytes = bytes(PlatformPitchAngleFull(OSD_pitch))
bufferData += _bytes
_bytes = bytes(PlatformRollAngleFull(OSD_roll))
bufferData += _bytes
# set packet header
writeData = UASLocalMetadataSet
sizeTotal = len(bufferData)
writeData += int_to_bytes(sizeTotal)
writeData += bufferData
# Write packet
f_write = open(end_path, "wb+")
f_write.write(writeData)
f_write.close()
# count += 1
except Exception as e:
count += 1
# logging.error(f'Error writing packet: {str(e)} on line {e.__traceback__.tb_lineno} count: {count}') # type:ignore
logging.info(f'✓ Wrote to klv with {count} issues')
def generate_klv(self) -> str:
if not os.path.exists(CSV_FOLDER):
os.makedirs(CSV_FOLDER)
out_path = (
CSV_FOLDER + "\\" + self.log_path.split("/")[-1].split(".")[0] + ".csv"
)
cmd = [".\\TxtLogToCSVtool.exe", self.log_path, out_path]
subprocess.run(cmd, shell=True)
self.write_to_klv(out_path)
logging.info('◌ Compiling klv')
filename = os.path.splitext(os.path.basename(self.log_path))[0]
glob_path = os.path.join(
os.getcwd(), KLV_FOLDER, glob.escape(filename), "*.klv"
)
files = glob.glob(glob_path)
out_data = b""
for fn in files:
with open(fn, "rb") as fp:
out_data += fp.read()
with open(f".mux/klv_output/{filename}/all.klv", "wb") as fp:
fp.write(out_data)
return f".mux/klv_output/{filename}/all.klv"
def __str__(self):
return (
f"Multiplexor Object:\n"
f" Log Path : {self.video_path}\n"
f" Video Path : {self.log_path}\n"
f" Bitrate : {self.bitrate} kbps\n"
f" Frame Rate : {self.frame_rate} fps\n"
f" Video Size : {self.video_size[0]}x{self.video_size[1]}\n"
)
if __name__ == "__main__":
root = tk.Tk()
root.withdraw()
log_path = filedialog.askopenfilename(
title="Select Log File",
filetypes=[("Text Files", "*.txt")]
)
video_path = filedialog.askopenfilename(title="Select Video File")
mux = Multiplexor(
log_path,
video_path,
_verbose=False,
)
mux.play()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment