Skip to content

Instantly share code, notes, and snippets.

@edxmorgan
Last active November 8, 2024 19:17
Show Gist options
  • Save edxmorgan/d52c7d3a64a874fc836d48f2c1c2ec35 to your computer and use it in GitHub Desktop.
Save edxmorgan/d52c7d3a64a874fc836d48f2c1c2ec35 to your computer and use it in GitHub Desktop.
BLUEROV_CAMERA VIDEO STREAM
#!/usr/bin/env python3
import cv2
import gi
import numpy as np
import json
gi.require_version('Gst', '1.0')
from gi.repository import Gst
class Video():
"""BlueRov video capture class constructor
Attributes:
port (int): Video UDP port
video_codec (string): Source h264 parser
video_decode (string): Transform YUV (12bits) to BGR (24bits)
video_pipe (object): GStreamer top-level pipeline
video_sink (object): Gstreamer sink element
video_sink_conf (string): Sink configuration
video_source (string): Udp source ip and port
"""
def __init__(self):
super().__init__("video")
self.port = 5600
self._frame = None
self.video_source = 'udpsrc port={}'.format(self.port)
self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264'
self.video_decode = '! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert'
self.video_sink_conf = '! appsink emit-signals=true sync=false max-buffers=2 drop=true'
self.video_pipe = None
self.video_sink = None
# font
self.font = cv2.FONT_HERSHEY_PLAIN
Gst.init()
self.run()
def start_gst(self, config=None):
""" Start gstreamer pipeline and sink
Pipeline description list e.g:
[
'videotestsrc ! decodebin', \
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
]
Args:
config (list, optional): Gstreamer pileline description list
"""
if not config:
config = \
[
'videotestsrc ! decodebin',
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
]
command = ' '.join(config)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.video_sink = self.video_pipe.get_by_name('appsink0')
@staticmethod
def gst_to_opencv(sample):
"""Transform byte array into np array
Args:
sample (TYPE): Description
Returns:
TYPE: Description
"""
buf = sample.get_buffer()
caps = sample.get_caps()
array = np.ndarray(
(
caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
3
),
buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
return array
def frame(self):
""" Get Frame
Returns:
iterable: bool and image frame, cap.read() output
"""
return self._frame
def frame_available(self):
"""Check if frame is available
Returns:
bool: true if frame is available
"""
return type(self._frame) != type(None)
def run(self):
""" Get frame to update _frame
"""
self.start_gst(
[
self.video_source,
self.video_codec,
self.video_decode,
self.video_sink_conf
])
self.video_sink.connect('new-sample', self.callback)
def callback(self, sink):
sample = sink.emit('pull-sample')
new_frame = self.gst_to_opencv(sample)
self._frame = new_frame
return Gst.FlowReturn.OK
def update(self):
if not self.frame_available():
return
frame = self.frame()
width = int(1920/1.5)
height = int(1080/1.5)
dim = (width, height)
img = cv2.resize(frame, dim, interpolation = cv2.INTER_AREA)
self.draw_gui(img, width, height)
cv2.imshow('BlueROV2 Camera', img)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.destroy_node()
def draw_gui(self, img, width, height):
img = cv2.rectangle(img,(0, height-100),(520,height),(0,0,0),-1)
def main(args=None):
node = Video()
while True:
node.update
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment