|
# -*- coding: utf-8 -*- |
|
import matplotlib.pyplot as plt |
|
import os |
|
import gi |
|
|
|
# Author : Fran Raga , 2021 |
|
# proof of concept to ingest KLV telemetry into a video. Multiplexer concept to create a MISB Video. |
|
# Related with : https://github.com/All4Gis/QGISFMV/blob/master/code/manager/QgsMultiplexor.py |
|
|
|
# Get Video width/Height |
|
# ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=s=x:p=0 DJI_0047.MP4 |
|
# width 3840x height 2160 |
|
|
|
# Extract all frames in folder |
|
# ffmpeg -i DJI_0047.mp4 -r 30/1 '%d.jpg' |
|
|
|
# Get Bitrate |
|
# ffprobe -v error -select_streams v:0 -show_entries stream=bit_rate -of default=noprint_wrappers=1:nokey=1 DJI_0047.mp4 |
|
# ffprobe -v error -select_streams v:0 -show_entries stream=bit_rate -of csv=p=0 DJI_0047.mp4 |
|
# result/ 1000 |
|
|
|
# Get Frame rate |
|
# ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate DJI_0047.MP4 |
|
|
|
# Merge all klv from QGISFMV to use in this code |
|
# import glob |
|
|
|
# Fix Video result |
|
# ffmpeg -re -i MISB.mp4 -map 0:v -map 0:d -codec copy fixed.ts |
|
# ffmpeg -i fixed.ts -map 0:v -muxpreload 0 -muxdelay 0 -map 0:d -codec copy newFile.ts |
|
|
|
# |
|
# files = [] |
|
# for file in glob.glob("/home/fragalop/SamplesFMV/multiplexor/klv/*.klv"): |
|
# files.append(file) |
|
# |
|
# out_data = b'' |
|
# for fn in files: |
|
# with open(fn, 'rb') as fp: |
|
# out_data += fp.read() |
|
# with open('/home/fragalop/SamplesFMV/multiplexor/all.klv', 'wb') as fp: |
|
# fp.write(out_data) |
|
|
|
gi.require_version('Gst', '1.0') |
|
from gi.repository import Gst, GObject |
|
|
|
#vid_file = "/home/fragalop/SamplesFMV/multiplexor/DJI_0047/DJI_0047.mp4" |
|
vid_frame_rate = 30 |
|
vid_width = 3840 |
|
vid_height = 2160 |
|
vid_bitrate = 60012 |
|
klv_file = "/home/fragalop/SamplesFMV/multiplexor/all.klv" |
|
img_folder ="/home/fragalop/SamplesFMV/multiplexor/img/" |
|
klv_packet_size = 112 # Extracted from QGISFMV "Create MISB Tool" |
|
out_file = "/home/fragalop/SamplesFMV/multiplexor/MISB.ts" |
|
|
|
|
|
class muxKlv: |
|
def __init__(self): |
|
self.is_push_buffer_allowed = None |
|
|
|
GObject.threads_init() |
|
Gst.init(None) |
|
|
|
self.cap = None |
|
self.fh = open(klv_file, 'rb') |
|
self.vid_frame_counter = 1 |
|
self.klv_frame_counter = 0 |
|
|
|
self.inject_klv = 0 |
|
self.duration = 1.0 / vid_frame_rate * Gst.SECOND |
|
self.dts = 0 |
|
self.fps = int(vid_frame_rate) |
|
# Create GStreamer Pipeline |
|
self.createGstPipeline() |
|
|
|
|
|
def createGstPipeline(self): |
|
|
|
# video source elements |
|
self.vsrc = Gst.ElementFactory.make("appsrc", "vidsrc") |
|
self.vqueue = Gst.ElementFactory.make("queue") |
|
self.vtee = Gst.ElementFactory.make("tee") |
|
|
|
# klv source elements |
|
self.appsrc = Gst.ElementFactory.make("appsrc") |
|
self.queue_klv = Gst.ElementFactory.make("queue") |
|
|
|
# recording elements |
|
self.queue_record = Gst.ElementFactory.make("queue") |
|
self.vcvt_encoder = Gst.ElementFactory.make("videoconvert") |
|
# encode |
|
self.encoder = Gst.ElementFactory.make("x264enc") |
|
self.muxer = Gst.ElementFactory.make("mpegtsmux") |
|
self.filesink = Gst.ElementFactory.make("filesink") |
|
|
|
# configure video element |
|
self.caps_str = "video/x-raw" |
|
self.caps_str += ",format=(string)RGB,width={},height={}".format(vid_width,vid_height) |
|
self.caps_str += ",framerate={}/1".format(int(vid_frame_rate)) |
|
self.vcaps = Gst.Caps.from_string(self.caps_str) |
|
self.vsrc.set_property("caps", self.vcaps); |
|
self.vsrc.set_property("format", Gst.Format.TIME) |
|
#self.vsrc.set_property("is-live", True) |
|
|
|
self.vsrc.connect("need-data", self.video_need_data) |
|
self.vsrc.connect("enough-data", self.video_enough_data) |
|
|
|
# configure appsrc element |
|
self.caps_str = "meta/x-klv" |
|
self.caps_str += ",parsed=True" |
|
self.caps = Gst.Caps.from_string(self.caps_str) |
|
self.appsrc.set_property("caps", self.caps) |
|
self.appsrc.connect("need-data", self.klv_need_data) |
|
self.appsrc.connect("enough-data", self.klv_enough_data) |
|
self.appsrc.set_property("format", Gst.Format.TIME) |
|
#self.appsrc.set_property("is-live", True) |
|
|
|
# configure encoder |
|
# self.encoder.set_property("noise-reduction", 1000) |
|
self.encoder.set_property("threads", 4) |
|
self.encoder.set_property("bitrate", vid_bitrate) |
|
self.encoder.set_property("byte-stream", True) |
|
|
|
# configure filesink |
|
self.filesink.set_property("location", out_file) |
|
self.filesink.set_property("async", 0) |
|
|
|
self.pipeline = Gst.Pipeline() |
|
self.pipeline.add(self.vsrc) |
|
self.pipeline.add(self.vqueue) |
|
self.pipeline.add(self.vtee) |
|
self.pipeline.add(self.appsrc) |
|
self.pipeline.add(self.queue_klv) |
|
self.pipeline.add(self.queue_record) |
|
self.pipeline.add(self.vcvt_encoder) |
|
self.pipeline.add(self.encoder) |
|
self.pipeline.add(self.muxer) |
|
self.pipeline.add(self.filesink) |
|
|
|
# link video elements |
|
self.vsrc.link(self.vqueue) |
|
self.vqueue.link(self.vtee) |
|
|
|
# link recording elements |
|
self.vtee.link(self.queue_record) |
|
self.queue_record.link(self.vcvt_encoder) |
|
self.vcvt_encoder.link(self.encoder) |
|
self.encoder.link(self.muxer) |
|
self.muxer.link(self.filesink) |
|
|
|
# link klv elements |
|
self.appsrc.link(self.queue_klv) |
|
self.queue_klv.link(self.muxer) |
|
|
|
def klv_need_data(self, src, length): |
|
print('======================> KLV need data length: %s' % length) |
|
#if self.inject_klv >= vid_frame_rate or self.vid_frame_counter==1: |
|
|
|
# KLV Data |
|
klv_bytes = self.fh.read(klv_packet_size) |
|
#print(klv_bytes) |
|
if(klv_bytes==b''): |
|
print("End klv stream") |
|
self.appsrc.emit("end-of-stream") |
|
|
|
klvbuf = Gst.Buffer.new_allocate(None, klv_packet_size, None) |
|
klvbuf.fill(0, klv_bytes) |
|
klvbuf.duration = Gst.SECOND |
|
klvbuf.pts = klvbuf.dts = self.dts |
|
#klvbuf.offset = self.dts |
|
retval = self.appsrc.emit("push-buffer", klvbuf) |
|
if retval != Gst.FlowReturn.OK: |
|
print(retval) |
|
|
|
self.klv_frame_counter += 1 |
|
self.inject_klv = 0 |
|
print("klv Frame {}".format(self.klv_frame_counter)) |
|
|
|
|
|
def klv_enough_data(self, src): |
|
print('======================> KLV enough data src: %s' % src) |
|
|
|
|
|
def video_need_data(self, src, length): |
|
#print('======================> Video need data length: %s' % length) |
|
#print('======================> Video need data src: %s' % src) |
|
# Get Image |
|
tmp_image =img_folder + str(self.vid_frame_counter)+ ".jpg" |
|
# print(tmp_image) |
|
if os.path.isfile(tmp_image): |
|
vid_frame = plt.imread(tmp_image) |
|
data = vid_frame.tostring() |
|
vidbuf = Gst.Buffer.new_allocate(None, len(data), None) |
|
vidbuf.fill(0, data) |
|
|
|
#print("Duration video frame {}".format(self.duration/ 1000.0)) |
|
#timestamp = (self.vid_frame_counter - 1) * self.duration |
|
#print("TimeStamp video frame {}".format(timestamp/ 1000.0)) |
|
|
|
vidbuf.duration = self.duration |
|
vidbuf.pts = vidbuf.dts = self.dts |
|
#vidbuf.offset = self.dts |
|
|
|
retval = self.vsrc.emit("push-buffer", vidbuf) |
|
|
|
print("video frame {}".format(self.vid_frame_counter)) |
|
|
|
if retval != Gst.FlowReturn.OK: |
|
print(retval) |
|
|
|
self.vid_frame_counter += 1 |
|
self.inject_klv +=1 |
|
# Up dts |
|
self.dts = self.dts + self.duration |
|
|
|
else: |
|
print("End Video and KLV Stream") |
|
# Video Stream |
|
self.vsrc.emit("end-of-stream") |
|
# KLV Stream |
|
self.appsrc.emit("end-of-stream") |
|
|
|
def video_enough_data(self, src): |
|
print('======================> Video enough data src: %s' % src) |
|
|
|
|
|
def play(self): |
|
|
|
ret = self.pipeline.set_state(Gst.State.PLAYING) |
|
|
|
if ret == Gst.StateChangeReturn.FAILURE: |
|
raise Exception("Unable to set the pipeline to the playing state") |
|
|
|
|
|
self.bus = self.pipeline.get_bus() |
|
|
|
while True: |
|
msg = self.bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE) |
|
t = msg.type |
|
if t == Gst.MessageType.EOS: |
|
print("EOS") |
|
break |
|
self.pipeline.set_state(Gst.State.NULL) |
|
elif t == Gst.MessageType.ERROR: |
|
err, debug = msg.parse_error() |
|
print("Error: %s" % err, debug) |
|
break |
|
elif t == Gst.MessageType.WARNING: |
|
err, debug = msg.parse_warning() |
|
print("Warning: %s" % err, debug) |
|
elif t == Gst.MessageType.STATE_CHANGED: |
|
pass |
|
elif t == Gst.MessageType.STREAM_STATUS: |
|
pass |
|
elif t in (Gst.MessageType.LATENCY, Gst.MessageType.NEW_CLOCK): |
|
print("Warning: %s" % msg.src) |
|
else: |
|
pass |
|
print("Unknown message: %s" % msg.src, msg.type) |
|
|
|
self.pipeline.set_state(Gst.State.NULL) |
|
|
|
print("Wohooo!MISB Created") |
|
|
|
|
|
#Test Inject klv data |
|
sender = muxKlv() |
|
sender.play() |
I'm using this and it seems to work, but at some point during the mux the python process seems to hang indefinitely. I'm not sure what's causing it, there's not tons of information printed but there is a warning. If I kill the process, the partial output video does play correctly, I can see the KLV stream with
ffprobe
and I can pull it out and parse it with theklvdata
python module. So it seems to be working, I just can't figure out why it's hanging randomly.In this example it hangs at frame 60 every time. But I have another ~300MB video that makes it all the way to frame 7000-something before it hangs.