|
# -*- coding: utf-8 -*- |
|
import matplotlib.pyplot as plt |
|
import os |
|
import gi |
|
|
|
# Author : Fran Raga , 2021 |
|
# proof of concept to ingest KLV telemetry into a video. Multiplexer concept to create a MISB Video. |
|
# Related with : https://github.com/All4Gis/QGISFMV/blob/master/code/manager/QgsMultiplexor.py |
|
|
|
# Get Video width/Height |
|
# ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=s=x:p=0 DJI_0047.MP4 |
|
# width 3840x height 2160 |
|
|
|
# Extract all frames in folder |
|
# ffmpeg -i DJI_0047.mp4 -r 30/1 '%d.jpg' |
|
|
|
# Get Bitrate |
|
# ffprobe -v error -select_streams v:0 -show_entries stream=bit_rate -of default=noprint_wrappers=1:nokey=1 DJI_0047.mp4 |
|
# ffprobe -v error -select_streams v:0 -show_entries stream=bit_rate -of csv=p=0 DJI_0047.mp4 |
|
# result/ 1000 |
|
|
|
# Get Frame rate |
|
# ffprobe -v error -select_streams v -of default=noprint_wrappers=1:nokey=1 -show_entries stream=r_frame_rate DJI_0047.MP4 |
|
|
|
# Merge all klv from QGISFMV to use in this code |
|
# import glob |
|
|
|
# Fix Video result |
|
# ffmpeg -re -i MISB.mp4 -map 0:v -map 0:d -codec copy fixed.ts |
|
# ffmpeg -i fixed.ts -map 0:v -muxpreload 0 -muxdelay 0 -map 0:d -codec copy newFile.ts |
|
|
|
# |
|
# files = [] |
|
# for file in glob.glob("/home/fragalop/SamplesFMV/multiplexor/klv/*.klv"): |
|
# files.append(file) |
|
# |
|
# out_data = b'' |
|
# for fn in files: |
|
# with open(fn, 'rb') as fp: |
|
# out_data += fp.read() |
|
# with open('/home/fragalop/SamplesFMV/multiplexor/all.klv', 'wb') as fp: |
|
# fp.write(out_data) |
|
|
|
gi.require_version('Gst', '1.0') |
|
from gi.repository import Gst, GObject |
|
|
|
#vid_file = "/home/fragalop/SamplesFMV/multiplexor/DJI_0047/DJI_0047.mp4" |
|
vid_frame_rate = 30 |
|
vid_width = 3840 |
|
vid_height = 2160 |
|
vid_bitrate = 60012 |
|
klv_file = "/home/fragalop/SamplesFMV/multiplexor/all.klv" |
|
img_folder ="/home/fragalop/SamplesFMV/multiplexor/img/" |
|
klv_packet_size = 112 # Extracted from QGISFMV "Create MISB Tool" |
|
out_file = "/home/fragalop/SamplesFMV/multiplexor/MISB.ts" |
|
|
|
|
|
class muxKlv: |
|
def __init__(self): |
|
self.is_push_buffer_allowed = None |
|
|
|
GObject.threads_init() |
|
Gst.init(None) |
|
|
|
self.cap = None |
|
self.fh = open(klv_file, 'rb') |
|
self.vid_frame_counter = 1 |
|
self.klv_frame_counter = 0 |
|
|
|
self.inject_klv = 0 |
|
self.duration = 1.0 / vid_frame_rate * Gst.SECOND |
|
self.dts = 0 |
|
self.fps = int(vid_frame_rate) |
|
# Create GStreamer Pipeline |
|
self.createGstPipeline() |
|
|
|
|
|
def createGstPipeline(self): |
|
|
|
# video source elements |
|
self.vsrc = Gst.ElementFactory.make("appsrc", "vidsrc") |
|
self.vqueue = Gst.ElementFactory.make("queue") |
|
self.vtee = Gst.ElementFactory.make("tee") |
|
|
|
# klv source elements |
|
self.appsrc = Gst.ElementFactory.make("appsrc") |
|
self.queue_klv = Gst.ElementFactory.make("queue") |
|
|
|
# recording elements |
|
self.queue_record = Gst.ElementFactory.make("queue") |
|
self.vcvt_encoder = Gst.ElementFactory.make("videoconvert") |
|
# encode |
|
self.encoder = Gst.ElementFactory.make("x264enc") |
|
self.muxer = Gst.ElementFactory.make("mpegtsmux") |
|
self.filesink = Gst.ElementFactory.make("filesink") |
|
|
|
# configure video element |
|
self.caps_str = "video/x-raw" |
|
self.caps_str += ",format=(string)RGB,width={},height={}".format(vid_width,vid_height) |
|
self.caps_str += ",framerate={}/1".format(int(vid_frame_rate)) |
|
self.vcaps = Gst.Caps.from_string(self.caps_str) |
|
self.vsrc.set_property("caps", self.vcaps); |
|
self.vsrc.set_property("format", Gst.Format.TIME) |
|
#self.vsrc.set_property("is-live", True) |
|
|
|
self.vsrc.connect("need-data", self.video_need_data) |
|
self.vsrc.connect("enough-data", self.video_enough_data) |
|
|
|
# configure appsrc element |
|
self.caps_str = "meta/x-klv" |
|
self.caps_str += ",parsed=True" |
|
self.caps = Gst.Caps.from_string(self.caps_str) |
|
self.appsrc.set_property("caps", self.caps) |
|
self.appsrc.connect("need-data", self.klv_need_data) |
|
self.appsrc.connect("enough-data", self.klv_enough_data) |
|
self.appsrc.set_property("format", Gst.Format.TIME) |
|
#self.appsrc.set_property("is-live", True) |
|
|
|
# configure encoder |
|
# self.encoder.set_property("noise-reduction", 1000) |
|
self.encoder.set_property("threads", 4) |
|
self.encoder.set_property("bitrate", vid_bitrate) |
|
self.encoder.set_property("byte-stream", True) |
|
|
|
# configure filesink |
|
self.filesink.set_property("location", out_file) |
|
self.filesink.set_property("async", 0) |
|
|
|
self.pipeline = Gst.Pipeline() |
|
self.pipeline.add(self.vsrc) |
|
self.pipeline.add(self.vqueue) |
|
self.pipeline.add(self.vtee) |
|
self.pipeline.add(self.appsrc) |
|
self.pipeline.add(self.queue_klv) |
|
self.pipeline.add(self.queue_record) |
|
self.pipeline.add(self.vcvt_encoder) |
|
self.pipeline.add(self.encoder) |
|
self.pipeline.add(self.muxer) |
|
self.pipeline.add(self.filesink) |
|
|
|
# link video elements |
|
self.vsrc.link(self.vqueue) |
|
self.vqueue.link(self.vtee) |
|
|
|
# link recording elements |
|
self.vtee.link(self.queue_record) |
|
self.queue_record.link(self.vcvt_encoder) |
|
self.vcvt_encoder.link(self.encoder) |
|
self.encoder.link(self.muxer) |
|
self.muxer.link(self.filesink) |
|
|
|
# link klv elements |
|
self.appsrc.link(self.queue_klv) |
|
self.queue_klv.link(self.muxer) |
|
|
|
def klv_need_data(self, src, length): |
|
print('======================> KLV need data length: %s' % length) |
|
#if self.inject_klv >= vid_frame_rate or self.vid_frame_counter==1: |
|
|
|
# KLV Data |
|
klv_bytes = self.fh.read(klv_packet_size) |
|
#print(klv_bytes) |
|
if(klv_bytes==b''): |
|
print("End klv stream") |
|
self.appsrc.emit("end-of-stream") |
|
|
|
klvbuf = Gst.Buffer.new_allocate(None, klv_packet_size, None) |
|
klvbuf.fill(0, klv_bytes) |
|
klvbuf.duration = Gst.SECOND |
|
klvbuf.pts = klvbuf.dts = self.dts |
|
#klvbuf.offset = self.dts |
|
retval = self.appsrc.emit("push-buffer", klvbuf) |
|
if retval != Gst.FlowReturn.OK: |
|
print(retval) |
|
|
|
self.klv_frame_counter += 1 |
|
self.inject_klv = 0 |
|
print("klv Frame {}".format(self.klv_frame_counter)) |
|
|
|
|
|
def klv_enough_data(self, src): |
|
print('======================> KLV enough data src: %s' % src) |
|
|
|
|
|
def video_need_data(self, src, length): |
|
#print('======================> Video need data length: %s' % length) |
|
#print('======================> Video need data src: %s' % src) |
|
# Get Image |
|
tmp_image =img_folder + str(self.vid_frame_counter)+ ".jpg" |
|
# print(tmp_image) |
|
if os.path.isfile(tmp_image): |
|
vid_frame = plt.imread(tmp_image) |
|
data = vid_frame.tostring() |
|
vidbuf = Gst.Buffer.new_allocate(None, len(data), None) |
|
vidbuf.fill(0, data) |
|
|
|
#print("Duration video frame {}".format(self.duration/ 1000.0)) |
|
#timestamp = (self.vid_frame_counter - 1) * self.duration |
|
#print("TimeStamp video frame {}".format(timestamp/ 1000.0)) |
|
|
|
vidbuf.duration = self.duration |
|
vidbuf.pts = vidbuf.dts = self.dts |
|
#vidbuf.offset = self.dts |
|
|
|
retval = self.vsrc.emit("push-buffer", vidbuf) |
|
|
|
print("video frame {}".format(self.vid_frame_counter)) |
|
|
|
if retval != Gst.FlowReturn.OK: |
|
print(retval) |
|
|
|
self.vid_frame_counter += 1 |
|
self.inject_klv +=1 |
|
# Up dts |
|
self.dts = self.dts + self.duration |
|
|
|
else: |
|
print("End Video and KLV Stream") |
|
# Video Stream |
|
self.vsrc.emit("end-of-stream") |
|
# KLV Stream |
|
self.appsrc.emit("end-of-stream") |
|
|
|
def video_enough_data(self, src): |
|
print('======================> Video enough data src: %s' % src) |
|
|
|
|
|
def play(self): |
|
|
|
ret = self.pipeline.set_state(Gst.State.PLAYING) |
|
|
|
if ret == Gst.StateChangeReturn.FAILURE: |
|
raise Exception("Unable to set the pipeline to the playing state") |
|
|
|
|
|
self.bus = self.pipeline.get_bus() |
|
|
|
while True: |
|
msg = self.bus.poll(Gst.MessageType.ANY, Gst.CLOCK_TIME_NONE) |
|
t = msg.type |
|
if t == Gst.MessageType.EOS: |
|
print("EOS") |
|
break |
|
self.pipeline.set_state(Gst.State.NULL) |
|
elif t == Gst.MessageType.ERROR: |
|
err, debug = msg.parse_error() |
|
print("Error: %s" % err, debug) |
|
break |
|
elif t == Gst.MessageType.WARNING: |
|
err, debug = msg.parse_warning() |
|
print("Warning: %s" % err, debug) |
|
elif t == Gst.MessageType.STATE_CHANGED: |
|
pass |
|
elif t == Gst.MessageType.STREAM_STATUS: |
|
pass |
|
elif t in (Gst.MessageType.LATENCY, Gst.MessageType.NEW_CLOCK): |
|
print("Warning: %s" % msg.src) |
|
else: |
|
pass |
|
print("Unknown message: %s" % msg.src, msg.type) |
|
|
|
self.pipeline.set_state(Gst.State.NULL) |
|
|
|
print("Wohooo!MISB Created") |
|
|
|
|
|
#Test Inject klv data |
|
sender = muxKlv() |
|
sender.play() |
thanks for the information