Skip to content

Instantly share code, notes, and snippets.

@jwoglom
Last active July 30, 2024 05:57
Show Gist options
  • Save jwoglom/090020e25755894042701cb6b56cb9ce to your computer and use it in GitHub Desktop.
Save jwoglom/090020e25755894042701cb6b56cb9ce to your computer and use it in GitHub Desktop.
import json
import socket # Just so we can properly handle hostname exceptions
import obspython as obs
import paho.mqtt.client as mqtt
import ssl
import pathlib
import time
import enum
import uuid
from multiprocessing import Lock
# Meta
__version__ = '1.0.3'
__version_info__ = (1, 0, 3)
__license__ = "AGPLv3"
__license_info__ = {
"AGPLv3": {
"product": "update_mqtt_status_homeassistant",
"users": 0, # 0 being unlimited
"customer": "Unsupported",
"version": __version__,
"license_format": "1.0",
}
}
__author__ = 'HeedfulCrayon and jwoglom'
__doc__ = """\
Publishes real-time OBS status info to the given MQTT server/port/channel \
at the configured interval. Also opens up controllable aspects of OBS to \
be controlled through MQTT.
"""
# Default values for the configurable options:
INTERVAL = 5 # Update interval (in seconds)
MQTT_HOST = "localhost" # Hostname of your MQTT server
MQTT_USER = ""
MQTT_PW = ""
MQTT_PORT = 1883 # Default MQTT port is 1883
MQTT_BASE_CHANNEL = ""
MQTT_SENSOR_NAME = "obs"
PROFILES = []
SCENES = []
PROFILE = None
SCENE = None
STREAM_SWITCH = None
RECORD_SWITCH = None
SENSOR = None
CONTROL = False
DEBUG = True
PROFILE_LOCK = Lock()
SCENE_LOCK = Lock()
CALLBACK_LOCK = Lock()
MAC = ':'.join(['{:02x}'.format((uuid.getnode() >> ele) & 0xff)
for ele in range(0,8*6,8)][::-1])
class SwitchType(str, enum.Enum):
profile = "profile"
scene = "scene"
record = "record"
stream = "stream"
class SwitchPayload(str, enum.Enum):
OFF = "OFF"
ON = "ON"
class Switch:
"""
Represents a controllable aspect of OBS (Profile, Record, Stream, etc.)
"""
def __init__(self):
self.publish_config()
self.subscribe()
self.publish_command(SwitchPayload.OFF)
def publish_config(self):
CLIENT.publish(self.config_topic, json.dumps(self.config))
if DEBUG: print(f"Published config {self.config['name']}")
def subscribe(self):
CLIENT.subscribe(self.command_topic)
if DEBUG: print(f"Subscribed to {self.config['name']}")
def publish_state(self, payload):
CLIENT.publish(self.state_topic, payload)
if DEBUG: print(f"{self.config['name']} state changed to {payload}")
def publish_command(self, payload):
CLIENT.publish(self.command_topic, payload)
if DEBUG: print(f"{self.config['name']} command published. Payload: {payload}")
class PersistentSwitch(Switch):
"""
Switch that is persisted (retained) in MQTT
"""
def __init__(self):
super().__init__()
self.publish_availability(SwitchPayload.ON)
def publish_config(self):
CLIENT.publish(self.config_topic, json.dumps(self.config), retain=True)
if DEBUG: print(f"Published config {self.config['name']}")
def publish_availability(self, payload):
CLIENT.publish(self.available_topic, payload)
if DEBUG: print(f"{self.config['name']} availability set to {payload}")
class ProfileSwitch(Switch):
def __init__(self, profile_name, mqtt_base_channel, mqtt_sensor_name):
self.profile_name = profile_name
self.mqtt_base_channel = mqtt_base_channel
self.mqtt_sensor_name = mqtt_sensor_name
self.switch_type = SwitchType.profile
self.state_topic = f"{self.mqtt_base_channel}/switch/profile_{self.profile_name}/state"
self.command_topic = f"{self.mqtt_base_channel}/switch/profile_{self.profile_name}/profile/set"
self.config_topic = f"{self.mqtt_base_channel}/switch/profile_{self.profile_name}/config"
self.config = {
"name": f"{self.profile_name} Profile",
"unique_id": f"{self.mqtt_sensor_name}_{self.profile_name}_profile",
"device": {
"name": f"{self.mqtt_sensor_name}",
"identifiers": f"[['mac',{MAC}]]",
"manufacturer": f"OBS Script v.{__version__}",
"sw_version": __version__
},
"state_topic": self.state_topic,
"command_topic": self.command_topic,
"icon": f"mdi:alpha-{self.profile_name[0].lower()}-box",
"payload_on": SwitchPayload.ON,
"payload_off": SwitchPayload.OFF
}
super().__init__()
def publish_remove_config(self):
CLIENT.publish(self.config_topic, "")
if DEBUG: print(f"Removed config {self.config['name']}")
class SceneSwitch(Switch):
def __init__(self, profile_name, scene_name, mqtt_base_channel, mqtt_sensor_name):
self.profile_name = profile_name
self.scene_name = scene_name
self.mqtt_base_channel = mqtt_base_channel
self.mqtt_sensor_name = mqtt_sensor_name
self.switch_type = SwitchType.scene
self.state_topic = f"{self.mqtt_base_channel}/switch/profile_{self.profile_name}_scene_{self.scene_name}/state"
self.command_topic = f"{self.mqtt_base_channel}/switch/profile_{self.profile_name}_scene_{self.scene_name}/scene/set"
self.config_topic = f"{self.mqtt_base_channel}/switch/profile_{self.profile_name}_scene_{self.scene_name}/config"
self.config = {
"name": f"{self.profile_name} Profile {self.scene_name} Scene",
"unique_id": f"{self.mqtt_sensor_name}_{self.profile_name}_profile_{self.scene_name}_scene",
"device": {
"name": f"{self.mqtt_sensor_name}",
"identifiers": f"[['mac',{MAC}]]",
"manufacturer": f"OBS Script v.{__version__}",
"sw_version": __version__
},
"state_topic": self.state_topic,
"command_topic": self.command_topic,
"icon": f"mdi:alpha-{self.scene_name[0].lower()}-box",
"payload_on": SwitchPayload.ON,
"payload_off": SwitchPayload.OFF
}
super().__init__()
def publish_remove_config(self):
CLIENT.publish(self.config_topic, "")
if DEBUG: print(f"Removed config {self.config['name']}")
class StreamSwitch(PersistentSwitch):
def __init__(self, mqtt_base_channel, mqtt_sensor_name):
self.mqtt_base_channel = mqtt_base_channel
self.mqtt_sensor_name = mqtt_sensor_name
self.switch_type = SwitchType.stream
self.state_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}/stream/state"
self.command_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}/stream/set"
self.config_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}_stream/config"
self.available_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}/stream/available"
self.config = {
"name": f"{self.mqtt_sensor_name} Stream",
"unique_id": f"{self.mqtt_sensor_name}_stream",
"device": {
"name": f"{self.mqtt_sensor_name}",
"identifiers": f"[['mac',{MAC}]]",
"manufacturer": f"OBS Script v.{__version__}",
"sw_version": __version__
},
"state_topic": self.state_topic,
"command_topic": self.command_topic,
"payload_on": SwitchPayload.ON,
"payload_off": SwitchPayload.OFF,
"availability": {
"payload_available": SwitchPayload.ON,
"payload_not_available": SwitchPayload.OFF,
"topic": self.available_topic
},
"icon": "mdi:broadcast"
}
super().__init__()
class RecordSwitch(PersistentSwitch):
def __init__(self, mqtt_base_channel, mqtt_sensor_name):
self.mqtt_base_channel = mqtt_base_channel
self.mqtt_sensor_name = mqtt_sensor_name
self.switch_type = SwitchType.record
self.state_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}/record/state"
self.command_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}/record/set"
self.config_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}_record/config"
self.available_topic = f"{self.mqtt_base_channel}/switch/{self.mqtt_sensor_name}/record/available"
self.config = {
"name": f"{self.mqtt_sensor_name} Record",
"unique_id": f"{self.mqtt_sensor_name}_record",
"device": {
"name": f"{self.mqtt_sensor_name}",
"identifiers": f"[['mac',{MAC}]]",
"manufacturer": f"OBS Script v.{__version__}",
"sw_version": __version__
},
"state_topic": self.state_topic,
"command_topic": self.command_topic,
"payload_on": SwitchPayload.ON,
"payload_off": SwitchPayload.OFF,
"availability": {
"payload_available": SwitchPayload.ON,
"payload_not_available": SwitchPayload.OFF,
"topic": self.available_topic
},
"icon": "mdi:record"
}
super().__init__()
class SensorState(str, enum.Enum):
Off = "Off"
Stopped = "Stopped"
Recording = "Recording"
Streaming = "Streaming"
Recording_and_Streaming = "Recording and Streaming"
class Sensor:
def __init__(self, mqtt_base_channel, mqtt_sensor_name):
self.mqtt_base_channel = mqtt_base_channel
self.mqtt_sensor_name = mqtt_sensor_name
self.state_topic = f"{self.mqtt_base_channel}/sensor/{self.mqtt_sensor_name}/state"
self.config_topic = f"{self.mqtt_base_channel}/sensor/{self.mqtt_sensor_name}/config"
self.attributes_topic = f"{self.mqtt_base_channel}/sensor/{self.mqtt_sensor_name}/attributes"
self.config = {
"name": self.mqtt_sensor_name,
"unique_id": self.mqtt_sensor_name,
"device": {
"name": f"{self.mqtt_sensor_name}",
"identifiers": f"[['mac',{MAC}]]",
"manufacturer": f"OBS Script v.{__version__}",
"sw_version": __version__
},
"state_topic": self.state_topic,
"json_attributes_topic": self.attributes_topic
}
self.state = self.get_state
self.previous_state = SensorState.Off
self.recording = obs.obs_frontend_recording_active
self.streaming = obs.obs_frontend_streaming_active
self.paused = obs.obs_frontend_recording_paused
self.replay_buffer =obs.obs_frontend_replay_buffer_active
self.fps = obs.obs_get_active_fps
self.frame_time_ns = obs.obs_get_average_frame_time_ns
self.frames = obs.obs_get_total_frames
self.lagged_frames = obs.obs_get_lagged_frames
self.active = False
self.publish_config()
self.publish_state()
self.publish_attributes()
def publish_config(self):
CLIENT.publish(self.config_topic, json.dumps(self.config))
if DEBUG: print(f"Published config {self.config['name']}")
def publish_attributes(self):
stats = {
"recording": self.recording(),
"streaming": self.streaming(),
"paused": self.paused(),
"fps": self.fps(),
"frame_time_ns": self.frame_time_ns(),
"frames": self.frames(),
"lagged_frames": self.lagged_frames()
}
CLIENT.publish(self.attributes_topic, json.dumps(stats))
self.publish_state()
if DEBUG:
print(f"{self.config['name']} attributes updated")
print(json.dumps(stats))
def get_state(self):
recording = self.recording()
streaming = self.streaming()
if recording and streaming:
self.active = True
self.previous_state = SensorState.Recording_and_Streaming
return SensorState.Recording_and_Streaming
elif streaming:
self.active = True
self.previous_state = SensorState.Streaming
return SensorState.Streaming
elif recording:
self.active = True
self.previous_state = SensorState.Recording
return SensorState.Recording
else:
self.active = False
self.previous_state = SensorState.Stopped
return SensorState.Stopped
def publish_state(self):
state = self.state()
CLIENT.publish(self.state_topic, state)
if DEBUG: print(f"{self.config['name']} state changed to {state}")
def publish_off_state(self):
self.previous_state = SensorState.Off
CLIENT.publish(self.state_topic, SensorState.Off)
if DEBUG: print(f"{self.config['name']} state changed to {SensorState.Off}")
# MQTT Event Functions
def on_mqtt_connect(client, userdata, flags, rc):
"""
Called when the MQTT client is connected from the server. Just prints a
message indicating we connected successfully.
"""
print("MQTT connection successful")
set_homeassistant_config()
def on_mqtt_disconnect(client, userdata, rc):
"""
Called when the MQTT client gets disconnected. Just logs a message about it
(we'll auto-reconnect inside of update_status()).
"""
print("MQTT disconnected. Reason: {}".format(str(rc)))
def on_mqtt_message(client, userdata, message):
"""
Handles MQTT messages that have been subscribed to
"""
payload = str(message.payload.decode("utf-8"))
if DEBUG: print(f"{message.topic}: {payload}")
entity = message_to_switch_entity(message)
if entity != None:
execute_action(entity, payload)
# OBS Script Function Exports
def script_description():
return __doc__ # We wrote a nice docstring... Might as well use it!
def script_load(settings):
"""
Just prints a message indicating that the script was loaded successfully.
"""
global STATE
global CLIENT
print("MQTT script loaded.")
# Using a global MQTT client variable to keep things simple:
print("Creating global mqtt client")
CLIENT = mqtt.Client()
CLIENT.on_connect = on_mqtt_connect
CLIENT.on_disconnect = on_mqtt_disconnect
CLIENT.on_message = on_mqtt_message
STATE = "Initializing"
def script_unload():
"""
Publishes a final status message indicating OBS is off
(so your MQTT sensor doesn't get stuck thinking you're
recording/streaming forever) and calls `CLIENT.disconnect()`.
"""
global STATE
print("Script unloading, getting lock")
CALLBACK_LOCK.acquire()
try:
STATE = "Off"
if CLIENT.is_connected():
SENSOR.publish_off_state()
set_persistent_switch_availability()
remove_profiles_from_homeassistant()
CLIENT.disconnect()
CLIENT.loop_stop()
finally:
CALLBACK_LOCK.release()
def script_defaults(settings):
"""
Sets up our default settings in the OBS Scripts interface.
"""
obs.obs_data_set_default_string(settings, "mqtt_host", MQTT_HOST)
obs.obs_data_set_default_string(settings, "mqtt_user", MQTT_USER)
obs.obs_data_set_default_string(settings, "mqtt_pw", MQTT_PW)
obs.obs_data_set_default_string(settings, "mqtt_base_channel", MQTT_BASE_CHANNEL)
obs.obs_data_set_default_string(settings, "mqtt_sensor_name", MQTT_SENSOR_NAME)
obs.obs_data_set_default_int(settings, "mqtt_port", MQTT_PORT)
obs.obs_data_set_default_int(settings, "interval", INTERVAL)
obs.obs_data_set_default_bool(settings, "controllable", CONTROL)
def script_properties():
"""
Makes this script's settings configurable via OBS's Scripts GUI.
"""
props = obs.obs_properties_create()
obs.obs_properties_add_text(props, "mqtt_host", "MQTT server hostname", obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_text(props, "mqtt_user", "MQTT username", obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_text(props, "mqtt_pw", "MQTT password", obs.OBS_TEXT_PASSWORD)
obs.obs_properties_add_text(props, "mqtt_base_channel", "MQTT Base channel",obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_text(props, "mqtt_sensor_name", "MQTT Sensor Name",obs.OBS_TEXT_DEFAULT)
obs.obs_properties_add_int(props, "mqtt_port", "MQTT TCP/IP port", MQTT_PORT, 65535, 1)
obs.obs_properties_add_int(props, "interval", "Update Interval (seconds)", 1, 3600, 1)
obs.obs_properties_add_bool(props, "controllable", "Control Streaming/Recording via MQTT")
obs.obs_properties_add_bool(props, "debug", "Debug")
return props
def script_update(settings):
"""
Applies any changes made to the MQTT settings in the OBS Scripts GUI then
reconnects the MQTT client.
"""
# Apply the new settings
global MQTT_HOST
global MQTT_USER
global MQTT_PW
global MQTT_PORT
global MQTT_BASE_CHANNEL
global MQTT_SENSOR_NAME
global INTERVAL
global CONTROL
global DEBUG
mqtt_host = obs.obs_data_get_string(settings, "mqtt_host")
if mqtt_host != MQTT_HOST:
MQTT_HOST = mqtt_host
mqtt_user = obs.obs_data_get_string(settings, "mqtt_user")
if mqtt_user != MQTT_USER:
MQTT_USER = mqtt_user
mqtt_pw = obs.obs_data_get_string(settings, "mqtt_pw")
if mqtt_pw != MQTT_PW:
MQTT_PW = mqtt_pw
mqtt_base_channel = obs.obs_data_get_string(settings, "mqtt_base_channel")
if mqtt_base_channel != MQTT_BASE_CHANNEL:
MQTT_BASE_CHANNEL = mqtt_base_channel
mqtt_sensor_name = obs.obs_data_get_string(settings, "mqtt_sensor_name")
if mqtt_sensor_name != MQTT_SENSOR_NAME:
MQTT_SENSOR_NAME = mqtt_sensor_name
mqtt_port = obs.obs_data_get_int(settings, "mqtt_port")
if mqtt_port != MQTT_PORT:
MQTT_PORT = mqtt_port
INTERVAL = obs.obs_data_get_int(settings, "interval")
CONTROL = obs.obs_data_get_bool(settings, "controllable")
# DEBUG = obs.obs_data_get_bool(settings, "debug")
# Disconnect (if connected) and reconnect the MQTT client
CLIENT.disconnect()
try:
if MQTT_PW != "" and MQTT_USER != "":
CLIENT.username_pw_set(MQTT_USER, password=MQTT_PW)
CLIENT.connect_async(MQTT_HOST, MQTT_PORT, 60)
except (socket.gaierror, ConnectionRefusedError) as e:
print("NOTE: Got a socket issue: %s" % e)
pass # Ignore it for now
CALLBACK_LOCK.acquire()
try:
obs.obs_frontend_remove_event_callback(frontend_changed)
obs.obs_frontend_add_event_callback(frontend_changed)
# Remove and replace the timer that publishes our status information
obs.timer_remove(update_status)
obs.timer_add(update_status, INTERVAL * 1000)
CLIENT.loop_start()
finally:
CALLBACK_LOCK.release()
def frontend_changed(event):
"""
Callback for frontend events
"""
names = {
obs.OBS_FRONTEND_EVENT_PROFILE_CHANGED: 'profile_changed',
obs.OBS_FRONTEND_EVENT_PROFILE_LIST_CHANGED: 'profile_list_changed',
obs.OBS_FRONTEND_EVENT_SCENE_CHANGED: 'scene_changed',
obs.OBS_FRONTEND_EVENT_SCENE_LIST_CHANGED: 'scene_list_changed',
obs.OBS_FRONTEND_EVENT_RECORDING_STARTED: 'recording_started',
obs.OBS_FRONTEND_EVENT_RECORDING_STOPPED: 'recording_stopped',
obs.OBS_FRONTEND_EVENT_STREAMING_STARTED: 'streaming_started',
obs.OBS_FRONTEND_EVENT_STREAMING_STOPPED: 'streaming_stopped'
}
switcher = {
obs.OBS_FRONTEND_EVENT_PROFILE_CHANGED: profile_changed,
obs.OBS_FRONTEND_EVENT_PROFILE_LIST_CHANGED: profile_list_changed,
obs.OBS_FRONTEND_EVENT_SCENE_CHANGED: scene_changed,
obs.OBS_FRONTEND_EVENT_SCENE_LIST_CHANGED: scene_list_changed,
obs.OBS_FRONTEND_EVENT_RECORDING_STARTED: recording_started,
obs.OBS_FRONTEND_EVENT_RECORDING_STOPPED: recording_stopped,
obs.OBS_FRONTEND_EVENT_STREAMING_STARTED: streaming_started,
obs.OBS_FRONTEND_EVENT_STREAMING_STOPPED: streaming_stopped
}
function = switcher.get(event, None)
if function != None:
try:
function()
except Exception as e:
print(f'exception occurred trying to call {names[event]} for {event=}: {e=}')
else:
print(f"Unknown event fired: {event}")
def profile_changed():
"""
Callback for OBS_FRONTEND_EVENT_PROFILE_CHANGED
"""
global PROFILE
PROFILE_LOCK.acquire()
try:
if not PROFILE:
return
PROFILE.publish_state(SwitchPayload.OFF)
new_profile = obs.obs_frontend_get_current_profile()
for profile in PROFILES:
if profile.profile_name == new_profile:
profile.publish_state(SwitchPayload.ON)
PROFILE = profile
print("Profile Changed")
finally:
PROFILE_LOCK.release()
def profile_list_changed():
"""
Callback for OBS_FRONTEND_EVENT_PROFILE_LIST_CHANGED
"""
PROFILE_LOCK.acquire()
try:
for profile in PROFILES:
profile.publish_remove_config()
time.sleep(0.1)
setup_profiles_in_homeassistant()
print("Profile List Changed")
finally:
PROFILE_LOCK.release()
def scene_changed():
"""
Callback for OBS_FRONTEND_EVENT_SCENE_CHANGED
"""
global SCENE
CALLBACK_LOCK.acquire()
SCENE_LOCK.acquire()
try:
if not SCENE:
return
SCENE.publish_state(SwitchPayload.OFF)
new_profile = obs.obs_frontend_get_current_profile()
new_scene_raw = obs.obs_frontend_get_current_scene()
new_scene = str(obs.obs_source_get_name(new_scene_raw))
obs.obs_source_release(new_scene_raw)
for scene in SCENES:
if scene.profile_name == new_profile and scene.scene_name == new_scene:
scene.publish_state(SwitchPayload.ON)
SCENE = scene
print("Scene Changed")
finally:
SCENE_LOCK.release()
CALLBACK_LOCK.release()
def scene_list_changed():
"""
Callback for OBS_FRONTEND_EVENT_SCENE_LIST_CHANGED
"""
SCENE_LOCK.acquire()
try:
for scene in SCENES:
scene.publish_remove_config()
time.sleep(0.1)
setup_profiles_in_homeassistant()
print("Scene List Changed")
finally:
SCENE_LOCK.release()
def recording_started():
"""
Publishes state of sensor and record switch
"""
SENSOR.publish_state()
SENSOR.publish_attributes()
if CONTROL and RECORD_SWITCH:
RECORD_SWITCH.publish_state(SwitchPayload.ON)
def recording_stopped():
"""
Publishes state of sensor and record switch
"""
SENSOR.publish_state()
if CONTROL and RECORD_SWITCH:
RECORD_SWITCH.publish_state(SwitchPayload.OFF)
def streaming_started():
"""
Publishes state of sensor and stream switch
"""
SENSOR.publish_state()
SENSOR.publish_attributes()
if CONTROL and STREAM_SWITCH:
STREAM_SWITCH.publish_state(SwitchPayload.ON)
def streaming_stopped():
"""
Publishes state of sensor and stream switch
"""
SENSOR.publish_state()
if CONTROL and STREAM_SWITCH:
STREAM_SWITCH.publish_state(SwitchPayload.OFF)
# Event Helper Functions
def set_homeassistant_config():
"""
Sends initial configuration state and attributes topic
for autodiscovery in Home Assistant
"""
global SENSOR
SENSOR = Sensor(MQTT_BASE_CHANNEL, MQTT_SENSOR_NAME)
if CONTROL:
setup_homeassistant_control()
def setup_homeassistant_control():
"""
Sets up profile, recording and streaming controls
"""
global STREAM_SWITCH
global RECORD_SWITCH
setup_profiles_in_homeassistant()
# Set up switches for autodiscovery
STREAM_SWITCH = StreamSwitch(MQTT_BASE_CHANNEL, MQTT_SENSOR_NAME)
RECORD_SWITCH = RecordSwitch(MQTT_BASE_CHANNEL, MQTT_SENSOR_NAME)
SETUP_HA_LOCK = Lock()
def setup_profiles_in_homeassistant():
"""
Publishes config, and subscribes to the command topic for each profile.
Also sets the current profile's state
"""
global PROFILE
global PROFILES
global SCENE
global SCENES
if DEBUG: print('setup_profiles_in_homeassistant wait')
CALLBACK_LOCK.acquire()
SETUP_HA_LOCK.acquire()
try:
if DEBUG: print('setup_profiles_in_homeassistant start')
current_profile = obs.obs_frontend_get_current_profile()
current_scene_raw = obs.obs_frontend_get_current_scene()
current_scene = str(obs.obs_source_get_name(current_scene_raw))
obs.obs_source_release(current_scene_raw)
profiles = obs.obs_frontend_get_profiles()
scenes = obs.obs_frontend_get_scenes()
if DEBUG: print(f'setup_profiles_in_homeassistant {current_profile=} {current_scene=} {profiles=} {scenes=}')
PROFILES = []
SCENES = []
for profile in profiles:
profile_switch = ProfileSwitch(
profile_name=profile,
mqtt_base_channel=MQTT_BASE_CHANNEL,
mqtt_sensor_name=MQTT_SENSOR_NAME
)
PROFILES.append(profile_switch)
if DEBUG: print(f"Profile {profile_switch.profile_name} added to PROFILES")
if profile_switch.profile_name == current_profile:
PROFILE = profile_switch
profile_switch.publish_state(SwitchPayload.ON)
for raw_scene in scenes:
scene = obs.obs_source_get_name(raw_scene)
scene_switch = SceneSwitch(
profile_name=current_profile,
scene_name=scene,
mqtt_base_channel=MQTT_BASE_CHANNEL,
mqtt_sensor_name=MQTT_SENSOR_NAME
)
SCENES.append(scene_switch)
if DEBUG: print(f"Scene {scene_switch.profile_name}/{scene_switch.scene_name} added to SCENES")
if scene_switch.profile_name == current_profile and scene_switch.scene_name == current_scene:
SCENE = scene_switch
scene_switch.publish_state(SwitchPayload.ON)
obs.source_list_release(scenes)
finally:
SETUP_HA_LOCK.release()
CALLBACK_LOCK.release()
def set_persistent_switch_availability():
"""
Reports the availability of the persistent switches
"""
RECORD_SWITCH.publish_availability(SwitchPayload.OFF)
STREAM_SWITCH.publish_availability(SwitchPayload.OFF)
def remove_profiles_from_homeassistant():
"""
Profiles are removed when obs is not open
"""
global PROFILES
global SCENES
for profile in PROFILES:
profile.publish_remove_config()
for scene in SCENES:
scene.publish_remove_config()
PROFILES = []
SCENES = []
def execute_action(switch, payload):
"""
Executes frontend actions (Profile change, recording, streaming)
"""
if switch.switch_type == SwitchType.profile:
if SENSOR.active: # Not sure if this NEEDS to be here as it won't actually change the profile while streaming/recording
return
prev_profile = obs.obs_frontend_get_current_profile()
if PROFILE.profile_name != switch.profile_name:
obs.obs_frontend_set_current_profile(switch.profile_name)
else:
print(f"Already on profile {switch.profile_name}")
elif switch.switch_type == SwitchType.scene:
if SENSOR.active: # Not sure if this NEEDS to be here as it won't actually change the profile while streaming/recording
return
if PROFILE.profile_name != switch.profile_name:
obs.obs_frontend_set_current_profile(switch.profile_name)
else:
print(f"Already on profile {switch.profile_name}")
if SCENE.scene_name != switch.scene_name:
CALLBACK_LOCK.acquire()
try:
scenes = obs.obs_frontend_get_scenes()
for raw_scene in scenes:
if obs.obs_source_get_name(raw_scene) == switch.scene_name:
obs.obs_frontend_set_current_scene(raw_scene)
obs.source_list_release(scenes)
finally:
CALLBACK_LOCK.release()
else:
print(f"Already on scene {switch.scene_name}")
elif switch.switch_type == SwitchType.stream:
if payload == SwitchPayload.ON:
obs.obs_frontend_streaming_start()
else:
obs.obs_frontend_streaming_stop()
elif switch.switch_type == SwitchType.record:
if payload == SwitchPayload.ON:
obs.obs_frontend_recording_start()
else:
obs.obs_frontend_recording_stop()
# Helper Functions
def update_status():
"""
Updates the STATE and the STATUS global with the stats of the current session.
This info if published (JSON-encoded) to the configured MQTT_HOST/MQTT_PORT/MQTT_BASE_CHANNEL.
Meant to be called at the configured INTERVAL.
"""
if DEBUG: print("update_status wait")
CALLBACK_LOCK.acquire()
if DEBUG: print("update_status running")
try:
global SENSOR
if not SENSOR:
if DEBUG: print("skipping update_status because SENSOR is not defined")
return
if CONTROL and STREAM_SWITCH and RECORD_SWITCH:
STREAM_SWITCH.publish_availability(SwitchPayload.ON)
RECORD_SWITCH.publish_availability(SwitchPayload.ON)
previous_state = SENSOR.previous_state
if previous_state != SensorState.Stopped and SENSOR.state() == SensorState.Stopped:
print("Publishing Final Stopped Message")
SENSOR.publish_attributes()
if previous_state != SensorState.Off and SENSOR.state() == SensorState.Off:
print("Publishing Final Off Message")
SENSOR.publish_attributes()
if SENSOR.active:
SENSOR.publish_attributes()
finally:
CALLBACK_LOCK.release()
if DEBUG: print("update_status done")
def message_to_switch_entity(message):
"""
Converts MQTT Message to the corresponding switch entity
"""
topic = pathlib.PurePosixPath(message.topic)
if DEBUG: print(f'message_to_switch_entity {message.topic=} {topic=}')
stem = topic.parent.stem
message_type = SwitchType[stem] if stem in SwitchType else None # Stream, Record or Profile
if message_type == SwitchType.stream:
return STREAM_SWITCH
if message_type == SwitchType.record:
return RECORD_SWITCH
if message_type == SwitchType.profile:
for profile in PROFILES:
if profile.command_topic == message.topic:
return profile
if message_type == SwitchType.scene:
for scene in SCENES:
if scene.command_topic == message.topic:
return scene
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment