Last active
March 14, 2022 23:34
-
-
Save nocarryr/33d9eabc8a4ca6034c26fdd3b0dd0a7d to your computer and use it in GitHub Desktop.
Experimenting with the Birddog REST API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import typing as tp | |
import enum | |
from dataclasses import dataclass | |
import json | |
from bs4 import BeautifulSoup | |
import requests | |
import click | |
class OperationMode(enum.Enum): | |
encode = enum.auto() | |
decode = enum.auto() | |
class AudioOutput(enum.Enum): | |
DecodeMain = enum.auto() | |
DecodeComms = enum.auto() | |
DecodeLoop = enum.auto() | |
class VideoOutput(enum.Enum): | |
sdi = enum.auto() | |
hdmi = enum.auto() | |
LowLatency = enum.auto() | |
NormalMode = enum.auto() | |
@dataclass | |
class AudioOutputSetup: | |
input_gain: int | |
output_gain: int | |
output_select: AudioOutput | |
@classmethod | |
def from_api(cls, data: tp.Dict) -> 'AudioOutputSetup': | |
kw = dict( | |
input_gain=int(data['AnalogAudioInGain']), | |
output_gain=int(data['AnalogAudioOutGain']), | |
output_select=getattr(AudioOutput, data['AnalogAudiooutputselect']), | |
) | |
return cls(**kw) | |
@dataclass | |
class DeviceSettings: | |
operation_mode: OperationMode | |
video_output: VideoOutput | |
audio_setup: AudioOutputSetup | |
def to_form_data(self) -> tp.Dict: | |
form_data = { | |
'mode': self.operation_mode.name, | |
'vid12g_loop_if': self.video_output.name, | |
'AnalogAudioInGain': self.audio_setup.input_gain, | |
'AnalogAudioOutGain': self.audio_setup.output_gain, | |
'AnalogAudiooutputselect': self.audio_setup.output_select.name, | |
} | |
return form_data | |
@dataclass | |
class NdiSource: | |
name: str | |
address: tp.Optional[str] = None | |
index: tp.Optional[int] = None | |
is_current: tp.Optional[bool] = False | |
def format(self): | |
if self.is_current: | |
prefix = '-->' | |
else: | |
prefix = ' ' | |
ix = self.index | |
if ix is None: | |
ix = ' ' | |
else: | |
ix = f'{ix:2d}' | |
return f'{prefix} [{ix}] "{self.name}" ({self.address})' | |
def format_url(base_url: str, api_method: str) -> str: | |
base_url = base_url.rstrip('/') | |
if not base_url.endswith(':8080'): | |
base_url = f'{base_url}:8080' | |
if '://' not in base_url: | |
base_url = f'http://{base_url}' | |
return f'{base_url}/{api_method}' | |
class AuthClient: | |
def __init__(self, base_url: str, password: str = 'birddog'): | |
if '://' not in base_url: | |
base_url = f'http://{base_url}' | |
base_url = base_url.rstrip('/') | |
self.base_url = base_url | |
self.password = password | |
self.session = requests.Session() | |
self.settings = None | |
self._logged_in = False | |
def format_url(self, *paths) -> str: | |
full_path = '/'.join(paths) | |
return f'{self.base_url}/{full_path}' | |
def get(self, *paths): | |
if not self._logged_in: | |
self._login() | |
url = self.format_url(*paths) | |
r = self.session.get(url) | |
r.raise_for_status() | |
return r | |
def post(self, *paths, data=None): | |
if not self._logged_in: | |
self._login() | |
url = self.format_url(*paths) | |
if data is not None: | |
r = self.session.post(url, data=data) | |
else: | |
r = self.session.post(url) | |
r.raise_for_status() | |
return r | |
def get_settings(self): | |
r = self.get('videoset') | |
soup = BeautifulSoup(r.content, 'html5lib') | |
mode_form = soup.find(id='mod_sel') | |
op_mode = None | |
for input_id in ['encode', 'decode']: | |
in_el = mode_form.find(id=input_id) | |
if 'checked' in in_el.attrs: | |
op_mode = input_id | |
assert op_mode is not None | |
op_mode = getattr(OperationMode, op_mode) | |
vout = None | |
for input_id in ['sdi', 'hdmi']: | |
in_el = mode_form.find(id=input_id) | |
if 'checked' in in_el.attrs: | |
vout = input_id | |
assert vout is not None | |
vout = getattr(VideoOutput, vout) | |
audio_setup = get_audio_setup(self.base_url) | |
self.settings = DeviceSettings( | |
operation_mode=op_mode, | |
video_output=vout, | |
audio_setup=audio_setup, | |
) | |
return self.settings | |
def set_operation_mode(self, mode: OperationMode): | |
settings = self.get_settings() | |
settings.operation_mode = mode | |
form_data = settings.to_form_data() | |
self.post('videoset', data=form_data) | |
def set_video_output(self, video_output: VideoOutput): | |
settings = self.get_settings() | |
settings.video_output = video_output | |
form_data = settings.to_form_data() | |
self.post('videoset', data=form_data) | |
def refresh_sources(self): | |
self.post('videoset', data={'add_new_sources':'new_sources'}) | |
def _logout(self): | |
url = self.format_url('logout') | |
r = self.session.post(url) | |
r.raise_for_status() | |
self._logged_in = False | |
def _login(self): | |
url = self.format_url('login') | |
r = self.session.post(url, data={'auth_password':self.password}) | |
r.raise_for_status() | |
self._logged_in = True | |
def __enter__(self): | |
self._login() | |
return self | |
def __exit__(self, *args): | |
try: | |
if self._logged_in: | |
self._logout | |
finally: | |
self.session.close() | |
self.session = requests.Session() | |
def get(base_url: str, api_method: str, params=None) -> tp.Union[bytes, tp.Dict]: | |
url = format_url(base_url, api_method) | |
if params is not None: | |
r = requests.get(url, params=params) | |
else: | |
r = requests.get(url) | |
r.raise_for_status() | |
try: | |
content = r.json() | |
except json.JSONDecodeError: | |
content = r.content | |
return content | |
def post( | |
base_url: str, api_method: str, | |
data: tp.Optional[tp.Dict] = None, timeout: tp.Optional[float] = None, | |
form_encoded: bool = False, | |
) -> tp.Union[bytes, tp.Dict]: | |
if data is None: | |
kw = {'headers':{'Accept':'text'}} | |
elif form_encoded: | |
kw = {'headers':{'Accept':'text'}, 'data':data} | |
else: | |
kw = { | |
'headers':{'Content-Type': 'application/json'}, | |
'data':json.dumps(data), | |
} | |
if timeout is not None: | |
kw['timeout'] = timeout | |
url = format_url(base_url, api_method) | |
r = requests.post(url, **kw) | |
r.raise_for_status() | |
try: | |
content = r.json() | |
except json.JSONDecodeError: | |
content = r.content | |
return content | |
def get_hostname(base_url: str) -> str: | |
content = get(base_url, 'hostname') | |
if isinstance(content, bytes): | |
content = content.decode() | |
return content | |
def reboot(base_url: str): | |
# Send a request to make sure the device is on the network | |
# (since we're ignoring the reboot response) | |
h = get_hostname(base_url) | |
try: | |
r = get(base_url, 'reboot') | |
except requests.ConnectionError: | |
pass | |
def restart(base_url: str): | |
# Send a request to make sure the device is on the network | |
# (since we're ignoring the reboot response) | |
h = get_hostname(base_url) | |
try: | |
r = get(base_url, 'restart') | |
except requests.ConnectionError: | |
raise | |
def get_mode(base_url: str) -> OperationMode: | |
content = get(base_url, 'operationmode') | |
if isinstance(content, dict): | |
try: | |
mode = content['mode'] | |
except KeyError: | |
print(f'{content=}') | |
raise | |
else: | |
mode = content.decode() | |
return getattr(OperationMode, mode) | |
def set_mode(base_url: str, mode: tp.Union[str, OperationMode]): | |
if isinstance(mode, str): | |
mode = getattr(OperationMode, mode) | |
with AuthClient(base_url) as client: | |
client.set_operation_mode(mode) | |
def get_audio_setup(base_url: str) -> AudioOutputSetup: | |
content = get(base_url, 'analogaudiosetup') | |
return AudioOutputSetup.from_api(content) | |
# TODO: The devices respond to this api endpoint with a 404. Firmware bug? | |
def get_video_output(base_url: str) -> VideoOutput: | |
with AuthClient(base_url) as client: | |
settings = client.get_settings() | |
return settings.video_output | |
def set_video_output(base_url: str, video_output: tp.Union[str, VideoOutput]): | |
if isinstance(video_output, str): | |
video_output = getattr(VideoOutput, video_output) | |
with AuthClient(base_url) as client: | |
client.set_video_output(video_output) | |
def list_sources(base_url: str) -> tp.Iterable[NdiSource]: | |
current_src = get_source(base_url) | |
content = get(base_url, 'List') | |
for i, key in enumerate(content.keys()): | |
val = content[key] | |
is_current = key == current_src.name | |
yield NdiSource(name=key, address=val, index=i, is_current=is_current) | |
def get_source(base_url: str) -> NdiSource: | |
content = get(base_url, 'connectTo') | |
return NdiSource(name=content['sourceName']) | |
def set_source(base_url: str, source: tp.Union[str, NdiSource, int]): | |
if isinstance(source, int): | |
source_index = source | |
sources = {src.index:src for src in list_sources(base_url)} | |
source = sources[source_index] | |
if isinstance(source, NdiSource): | |
source = source.name | |
content = post(base_url, 'connectTo', {'sourceName':source}) | |
assert content == b'success' | |
def refresh_sources(base_url: str): | |
with AuthClient(base_url) as client: | |
client.refresh_sources() | |
@click.group() | |
@click.argument('url', type=str) | |
@click.pass_context | |
def cli(ctx, **kwargs): | |
ctx.ensure_object(dict) | |
ctx.obj.update({k:v for k,v in kwargs.items()}) | |
@cli.command() | |
@click.pass_context | |
def hostname(ctx): | |
h = get_hostname(ctx.obj['url']) | |
hurl = f'http://{h}.local' | |
click.echo(f'{h} -> {hurl}') | |
@cli.command(name='reboot') | |
@click.pass_context | |
def cli_reboot(ctx): | |
reboot(ctx.obj['url']) | |
click.echo('Device rebooting') | |
@cli.command(name='restart') | |
@click.pass_context | |
def cli_restart(ctx): | |
restart(ctx.obj['url']) | |
click.echo('Video system restarting') | |
@cli.group() | |
@click.pass_context | |
def mode(ctx): | |
pass | |
@mode.command(name='get') | |
@click.pass_context | |
def cli_mode_get(ctx): | |
current_mode = get_mode(ctx.obj['url']) | |
click.echo(f'Current Mode: "{current_mode.name}"') | |
@mode.command(name='set') | |
@click.argument('new_mode', type=click.Choice(['encode', 'decode'])) | |
@click.pass_context | |
def cli_mode_set(ctx, new_mode): | |
set_mode(ctx.obj['url'], new_mode) | |
click.echo(f'Mode set to "{new_mode}". Rebooting device...') | |
reboot(ctx.obj['url']) | |
@cli.group(name='audio') | |
@click.pass_context | |
def cli_audio(ctx): | |
pass | |
@cli_audio.command(name='get') | |
@click.pass_context | |
def cli_audio_get(ctx): | |
audio = get_audio_setup(ctx.obj['url']) | |
click.echo(str(audio)) | |
# TODO: Disabled as part of :func:`get_video_output` and :func:`set_video_output` issues | |
@cli.group(name='output') | |
@click.pass_context | |
def cli_output(ctx): | |
pass | |
@cli_output.command(name='get') | |
@click.pass_context | |
def cli_output_get(ctx): | |
output = get_video_output(ctx.obj['url']) | |
click.echo(f'Video output: "{output.name}"') | |
@cli_output.command(name='set') | |
@click.argument('mode', type=click.Choice(['sdi', 'hdmi'])) | |
@click.pass_context | |
def cli_output_set(ctx, mode): | |
set_video_output(ctx.obj['url'], mode) | |
click.echo(f'Video output set to "{mode.name}"') | |
@cli.group() | |
@click.pass_context | |
def source(ctx): | |
pass | |
@source.command(name='refresh') | |
@click.pass_context | |
def source_refresh(ctx): | |
click.echo('Refreshing...') | |
refresh_sources(ctx.obj['url']) | |
click.echo('Available Sources:') | |
for src in list_sources(ctx.obj['url']): | |
click.echo(src.format()) | |
@source.command(name='current') | |
@click.pass_context | |
def current_source(ctx): | |
src = get_source(ctx.obj['url']) | |
click.echo(f'Current Source:') | |
click.echo(src.format()) | |
@source.command(name='list') | |
@click.pass_context | |
def cli_list_sources(ctx): | |
click.echo('Available Sources:') | |
for src in list_sources(ctx.obj['url']): | |
click.echo(src.format()) | |
@source.command(name='set') | |
@click.argument('source', type=str) | |
@click.pass_context | |
def cli_set_source(ctx, source): | |
if source.isdigit(): | |
source = int(source) | |
set_source(ctx.obj['url'], source) | |
for src in list_sources(ctx.obj['url']): | |
click.echo(src.format()) | |
if __name__ == '__main__': | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment