Last active
November 8, 2024 19:17
-
-
Save edxmorgan/d52c7d3a64a874fc836d48f2c1c2ec35 to your computer and use it in GitHub Desktop.
BLUEROV_CAMERA VIDEO STREAM
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import cv2 | |
import gi | |
import numpy as np | |
import json | |
gi.require_version('Gst', '1.0') | |
from gi.repository import Gst | |
class Video(): | |
"""BlueRov video capture class constructor | |
Attributes: | |
port (int): Video UDP port | |
video_codec (string): Source h264 parser | |
video_decode (string): Transform YUV (12bits) to BGR (24bits) | |
video_pipe (object): GStreamer top-level pipeline | |
video_sink (object): Gstreamer sink element | |
video_sink_conf (string): Sink configuration | |
video_source (string): Udp source ip and port | |
""" | |
def __init__(self): | |
super().__init__("video") | |
self.port = 5600 | |
self._frame = None | |
self.video_source = 'udpsrc port={}'.format(self.port) | |
self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264' | |
self.video_decode = '! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert' | |
self.video_sink_conf = '! appsink emit-signals=true sync=false max-buffers=2 drop=true' | |
self.video_pipe = None | |
self.video_sink = None | |
# font | |
self.font = cv2.FONT_HERSHEY_PLAIN | |
Gst.init() | |
self.run() | |
def start_gst(self, config=None): | |
""" Start gstreamer pipeline and sink | |
Pipeline description list e.g: | |
[ | |
'videotestsrc ! decodebin', \ | |
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert', | |
'! appsink' | |
] | |
Args: | |
config (list, optional): Gstreamer pileline description list | |
""" | |
if not config: | |
config = \ | |
[ | |
'videotestsrc ! decodebin', | |
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert', | |
'! appsink' | |
] | |
command = ' '.join(config) | |
self.video_pipe = Gst.parse_launch(command) | |
self.video_pipe.set_state(Gst.State.PLAYING) | |
self.video_sink = self.video_pipe.get_by_name('appsink0') | |
@staticmethod | |
def gst_to_opencv(sample): | |
"""Transform byte array into np array | |
Args: | |
sample (TYPE): Description | |
Returns: | |
TYPE: Description | |
""" | |
buf = sample.get_buffer() | |
caps = sample.get_caps() | |
array = np.ndarray( | |
( | |
caps.get_structure(0).get_value('height'), | |
caps.get_structure(0).get_value('width'), | |
3 | |
), | |
buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8) | |
return array | |
def frame(self): | |
""" Get Frame | |
Returns: | |
iterable: bool and image frame, cap.read() output | |
""" | |
return self._frame | |
def frame_available(self): | |
"""Check if frame is available | |
Returns: | |
bool: true if frame is available | |
""" | |
return type(self._frame) != type(None) | |
def run(self): | |
""" Get frame to update _frame | |
""" | |
self.start_gst( | |
[ | |
self.video_source, | |
self.video_codec, | |
self.video_decode, | |
self.video_sink_conf | |
]) | |
self.video_sink.connect('new-sample', self.callback) | |
def callback(self, sink): | |
sample = sink.emit('pull-sample') | |
new_frame = self.gst_to_opencv(sample) | |
self._frame = new_frame | |
return Gst.FlowReturn.OK | |
def update(self): | |
if not self.frame_available(): | |
return | |
frame = self.frame() | |
width = int(1920/1.5) | |
height = int(1080/1.5) | |
dim = (width, height) | |
img = cv2.resize(frame, dim, interpolation = cv2.INTER_AREA) | |
self.draw_gui(img, width, height) | |
cv2.imshow('BlueROV2 Camera', img) | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
self.destroy_node() | |
def draw_gui(self, img, width, height): | |
img = cv2.rectangle(img,(0, height-100),(520,height),(0,0,0),-1) | |
def main(args=None): | |
node = Video() | |
while True: | |
node.update | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment