Created
June 27, 2024 23:09
-
-
Save nocarryr/c2c6ef1abaec9cafd1d416db27c9c2b3 to your computer and use it in GitHub Desktop.
cyndilib sender example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import NamedTuple, cast | |
from typing_extensions import Self | |
import enum | |
import io | |
import subprocess | |
import shlex | |
from fractions import Fraction | |
from contextlib import contextmanager | |
import click | |
from cyndilib.wrapper.ndi_structs import FourCC | |
from cyndilib.video_frame import VideoSendFrame | |
from cyndilib.sender import Sender | |
FF_CMD = '{ffmpeg} -f lavfi -i testsrc2=size={xres}x{yres}:rate={fps} -pix_fmt {pix_fmt.name} -f rawvideo pipe: ' | |
"""ffmpeg command line format to generate frames and send raw data through a pipe""" | |
class PixFmt(enum.Enum): | |
uyvy422 = FourCC.UYVY | |
nv12 = FourCC.NV12 | |
rgba = FourCC.RGBA | |
bgra = FourCC.BGRA | |
@classmethod | |
def from_str(cls, name: str) -> Self: | |
return cls.__members__[name] | |
class Options(NamedTuple): | |
pix_fmt: PixFmt | |
xres: int | |
yres: int | |
fps: str | |
sender_name: str = 'ffmpeg_sender' | |
ffmpeg: str = 'ffmpeg' | |
def parse_frame_rate(fr: str) -> Fraction: | |
"""Helper for NTSC frame rates (29.97, 59.94) | |
""" | |
if '/' in fr: | |
n, d = [int(s) for s in fr.split('/')] | |
elif '.' in fr: | |
n = round(float(fr)) * 1000 | |
d = 1001 | |
else: | |
n = int(fr) | |
d = 1 | |
return Fraction(n, d) | |
def build_video_frame(opts: Options) -> VideoSendFrame: | |
"""Build a :class:`VideoSendFrame <cyndilib.video_frame.VideoSendFrame>` | |
with the given parameters for frame rate, resolution and pixel format | |
""" | |
vf = VideoSendFrame() | |
vf.set_resolution(opts.xres, opts.yres) | |
fr = parse_frame_rate(opts.fps) | |
vf.set_frame_rate(fr) | |
vf.set_fourcc(opts.pix_fmt.value) | |
return vf | |
@contextmanager | |
def ffmpeg_proc(opts: Options): | |
"""Context manager for the ffmpeg subprocess generating frames | |
""" | |
ff_cmd = FF_CMD.format(**opts._asdict()) | |
ff_proc = subprocess.Popen(shlex.split(ff_cmd), stdout=subprocess.PIPE) | |
try: | |
ff_proc.poll() | |
if ff_proc.returncode is None: | |
yield ff_proc | |
finally: | |
ff_proc.kill() | |
def send(opts: Options): | |
sender = Sender(opts.sender_name) | |
vf = build_video_frame(opts) | |
sender.set_video_frame(vf) | |
frame_size_bytes = vf.get_data_size() | |
click.echo(f'{frame_size_bytes=}') | |
# Pre-allocate a bytearray to hold frame data and create a view of it | |
# So we can buffer into it from ffmpeg then pass directly to the sender | |
ba = bytearray(frame_size_bytes) | |
mv = memoryview(ba) | |
i = 0 | |
with sender: | |
with ffmpeg_proc(opts) as ff_proc: | |
stdout = cast(io.BytesIO, ff_proc.stdout) | |
while True: | |
if ff_proc.returncode is not None: | |
break | |
# Read from the ffmpeg process into a view of the bytearray | |
num_read = stdout.readinto(mv) | |
# The first few reads might be empty, ignore | |
if num_read == 0: | |
continue | |
# Pass the memoryview directly to the sender | |
# (using the buffer protocol) | |
sender.write_video_async(mv) | |
i += 1 | |
if i % 10 == 0: | |
ff_proc.poll() | |
@click.command() | |
@click.option( | |
'--pix-fmt', | |
type=click.Choice(choices=[m.name for m in PixFmt]), | |
default=PixFmt.uyvy422.name, | |
show_default=True, | |
show_choices=True, | |
) | |
@click.option('-x', '--x-res', type=int, default=1920, show_default=True) | |
@click.option('-y', '--y-res', type=int, default=1080, show_default=True) | |
@click.option('--fps', type=str, default='30', show_default=True) | |
@click.option( | |
'-n', '--sender-name', | |
type=str, | |
default='ffmpeg_sender', | |
show_default=True, | |
help='NDI name for the sender', | |
) | |
@click.option( | |
'--ffmpeg', | |
type=str, | |
default='ffmpeg', | |
show_default=True, | |
help='Name/Path of the "ffmpeg" executable', | |
) | |
def main(pix_fmt: str, x_res: int, y_res: int, fps: str, sender_name: str, ffmpeg: str): | |
opts = Options( | |
pix_fmt=PixFmt.from_str(pix_fmt), | |
xres=x_res, | |
yres=y_res, | |
fps=fps, | |
sender_name=sender_name, | |
ffmpeg=ffmpeg, | |
) | |
send(opts) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import NamedTuple | |
from typing_extensions import Self | |
import enum | |
import time | |
import subprocess | |
import shlex | |
import socket | |
from fractions import Fraction | |
from contextlib import contextmanager | |
import click | |
from cyndilib.wrapper.ndi_structs import FourCC | |
from cyndilib.wrapper.ndi_recv import RecvColorFormat, RecvBandwidth | |
from cyndilib.video_frame import VideoFrameSync | |
from cyndilib.receiver import Receiver | |
from cyndilib.finder import Finder, Source | |
FF_PLAY = '{ffplay} -video_size {xres}x{yres} -pixel_format {pix_fmt} -f rawvideo -i pipe:' | |
"""ffplay command line format""" | |
pix_fmts = { | |
FourCC.UYVY: 'uyvy422', | |
FourCC.NV12: 'nv12', | |
FourCC.RGBA: 'rgba', | |
FourCC.BGRA: 'bgra', | |
FourCC.RGBX: 'rgba', | |
FourCC.BGRX: 'bgra', | |
} | |
"""Mapping of :class:`FourCC <cyndilib.wrapper.ndi_structs.FourCC>` types to | |
ffmpeg's ``pix_fmt`` definitions | |
""" | |
class RecvFmt(enum.Enum): | |
uyvy = RecvColorFormat.UYVY_RGBA | |
rgb = RecvColorFormat.RGBX_RGBA | |
bgr = RecvColorFormat.BGRX_BGRA | |
@classmethod | |
def from_str(cls, name: str) -> Self: | |
return cls.__members__[name] | |
class Bandwidth(enum.Enum): | |
lowest = RecvBandwidth.lowest | |
highest = RecvBandwidth.highest | |
@classmethod | |
def from_str(cls, name: str) -> Self: | |
return cls.__members__[name] | |
class Options(NamedTuple): | |
sender_name: str = 'ffmpeg_sender' | |
recv_fmt: RecvFmt = RecvFmt.uyvy | |
recv_bandwidth: Bandwidth = Bandwidth.highest | |
ffplay: str = 'ffplay' | |
def build_receiver(options: Options, source: Source): | |
"""Create the receiver and video frame, then connect it to the given | |
ndi source | |
""" | |
receiver = Receiver( | |
color_format=options.recv_fmt.value, | |
bandwidth=options.recv_bandwidth.value, | |
) | |
vf = VideoFrameSync() | |
receiver.frame_sync.set_video_frame(vf) | |
receiver.set_source(source) | |
click.echo(f'connecting to "{source.name}"...') | |
i = 0 | |
while not receiver.is_connected(): | |
if i > 30: | |
raise Exception('timeout waiting for connection') | |
time.sleep(.5) | |
i += 1 | |
click.echo('connected') | |
return receiver | |
@contextmanager | |
def get_source(name: str): | |
"""Use the Finder to search for an NDI source by name | |
Note that this is a context manager (use in a :keyword:`with` block). | |
This must remain open since :class:`Finder <cyndilib.finder.Finder>` | |
and its :class:`Source <cyndilib.finder.Source>` objects hold references to | |
C pointers (``NDIlib_source_t*``) owned by the ``NDIlib_find_instance_t`` | |
struct. | |
""" | |
finder = Finder() | |
finder.open() | |
try: | |
click.echo('waiting for ndi sources...') | |
finder.wait_for_sources(10) | |
sources: list[str] = finder.get_source_names() | |
click.echo(f'{sources=}') | |
if name not in sources: | |
# If the source exists on the local machine, try its full NDI name: | |
# '<HOSTNAME> (<name>)' | |
hostname = socket.gethostname().upper() | |
_name = f'{hostname} ({name})' | |
click.echo(f'"{name}" not found. searching for "{_name}"') | |
if _name in sources: | |
click.echo(f'found source name: "{_name}"') | |
name = _name | |
else: | |
raise Exception('source not found') | |
source: Source = finder.get_source(name) | |
yield source | |
finally: | |
finder.close() | |
def wait_for_first_frame(receiver: Receiver): | |
"""The first few frames contain no data. Capture frames until the first | |
non-empty one | |
""" | |
vf = receiver.frame_sync.video_frame | |
frame_rate: Fraction = vf.get_frame_rate() | |
wait_time = float(1 / frame_rate) | |
click.echo('waiting for frame...') | |
while receiver.is_connected(): | |
receiver.frame_sync.capture_video() | |
resolution = vf.get_resolution() | |
if min(resolution) > 0 and vf.get_data_size() > 0: | |
click.echo('have frame') | |
return | |
time.sleep(wait_time) | |
def play(options: Options): | |
# Get the NDI source and keep the Finder open until exit | |
with get_source(options.sender_name) as source: | |
receiver = build_receiver(options, source) | |
vf = receiver.frame_sync.video_frame | |
proc: subprocess.Popen|None = None | |
try: | |
wait_for_first_frame(receiver) | |
# At this point we should have received a frame, so the pixel format, | |
# resolution and frame rate should be populated. | |
fourcc = vf.get_fourcc() | |
frame_rate: Fraction = vf.get_frame_rate() | |
wait_time = float(1 / frame_rate) | |
xres, yres = vf.get_resolution() | |
cmd_str = FF_PLAY.format( | |
xres=xres, | |
yres=yres, | |
pix_fmt=pix_fmts[fourcc], | |
ffplay=options.ffplay, | |
) | |
click.echo(f'{cmd_str=}') | |
proc = subprocess.Popen(shlex.split(cmd_str), stdin=subprocess.PIPE) | |
assert proc.stdin is not None | |
# Since we already have a frame with data, write it to ffplay | |
# Note that the frame object itself is directly used as the data source | |
# (since `VideoFrameSync` supports the buffer protocol) | |
proc.stdin.write(vf) | |
while receiver.is_connected(): | |
# Not the best timing method, but we're using `FrameSync` to | |
# capture frames, so it'll correct things for us. | |
time.sleep(wait_time) | |
receiver.frame_sync.capture_video() | |
proc.poll() | |
if proc.returncode is not None: | |
break | |
proc.stdin.write(vf) | |
finally: | |
if proc is not None: | |
proc.kill() | |
@click.command() | |
@click.option( | |
'-s', '--sender-name', | |
type=str, | |
default='ffmpeg_sender', | |
show_default=True, | |
help='The NDI source name to connect to', | |
) | |
@click.option( | |
'-f', '--recv-fmt', | |
type=click.Choice(choices=[m.name for m in RecvFmt]), | |
default='uyvy', | |
show_default=True, | |
show_choices=True, | |
help='Pixel format' | |
) | |
@click.option( | |
'-b', '--recv-bandwidth', | |
type=click.Choice(choices=[m.name for m in Bandwidth]), | |
default='highest', | |
show_default=True, | |
show_choices=True, | |
) | |
@click.option( | |
'--ffplay', | |
type=str, | |
default='ffplay', | |
show_default=True, | |
help='Name/Path of the "ffplay" executable', | |
) | |
def main(sender_name: str, recv_fmt: str, recv_bandwidth: str, ffplay: str): | |
options = Options( | |
sender_name=sender_name, | |
recv_fmt=RecvFmt.from_str(recv_fmt), | |
recv_bandwidth=Bandwidth.from_str(recv_bandwidth), | |
ffplay=ffplay, | |
) | |
play(options) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment