Created
January 15, 2024 07:47
-
-
Save poohsen/bdf495251bae80590a268a46c12e3ab6 to your computer and use it in GitHub Desktop.
duck avr when mycroft listens
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
from mycroft_bus_client import MessageBusClient, Message | |
from kodijson import Kodi | |
from dotenv import load_dotenv | |
import os | |
import time | |
import threading | |
import re | |
import logging | |
import rxv | |
delayed_unduck_timer = None | |
client = None | |
kodi = None | |
rx = None | |
volume_before_ducking = None | |
def get_rx(): | |
yamaha_fallback_ip = "192.168.2.5" | |
print("falling back to %s" % yamaha_fallback_ip) | |
rx = rxv.RXV("http://%s:80/YamahaRemoteControl/ctrl" % yamaha_fallback_ip, "RX-V473") | |
return rx | |
def get_current_volume(): | |
return int(rx.volume) | |
def duck(): | |
current = get_current_volume() | |
duck_by_db = 20 | |
logging.info(f"ducking from {current} by {duck_by_db}db") | |
global volume_before_ducking | |
volume_before_ducking = current | |
step_size = 2 | |
for i in range(current,int(current-duck_by_db) - step_size,-step_size): | |
#kodi.Application.SetVolume({"volume": i}) | |
print(f"volume {i}") | |
rx.volume = i | |
time.sleep(0.10) | |
def wakeword_handler(message): | |
logging.info('\nMycroft "{}"'.format(message.__dict__)) | |
client.once('recognizer_loop:record_end', record_end_handler) | |
client.once('recognizer_loop:utterance', utterance_handler) | |
duck() | |
def cancel_delayed_unduck(caller): | |
if delayed_unduck_timer: | |
logging.info(f"cancelling delayed unduck (from {caller})") | |
delayed_unduck_timer.cancel() | |
def set_delayed_unduck(delay= 1.5): | |
global delayed_unduck_timer | |
cancel_delayed_unduck("set_delayed_unduck") | |
delayed_unduck_timer = threading.Timer(delay, unregister_and_unduck) | |
delayed_unduck_timer.start() | |
def utterance_handler(message): | |
logging.info('Mycroft "{}"'.format(message.__dict__)) | |
p = re.compile('set .*kodi volume') | |
if not p.match(message.data.get('utterances')[0]): | |
client.once('recognizer_loop:audio_output_start', output_start_handler) | |
# 5s could be too little for long TTS output that's not cached. | |
set_delayed_unduck(delay = 5) | |
# TODO: the weather report comes in several audio parts, | |
# each with their own start and end, so using client.once | |
# doesn't work in that case | |
def output_start_handler(message): | |
logging.info('Mycroft "{}"'.format(message.__dict__)) | |
client.once('recognizer_loop:audio_output_end', output_end_handler) | |
cancel_delayed_unduck("output_start_handler") | |
def output_end_handler(message): | |
logging.info('Mycroft "{}"'.format(message.__dict__)) | |
unduck_music() | |
def unregister_and_unduck(): | |
handlers = [lambda: client.remove_all_listeners('recognizer_loop:utterance'), | |
lambda: client.remove_all_listeners('recognizer_loop:audio_output_start'), | |
lambda: client.remove_all_listeners('recognizer_loop:audio_output_end')] | |
for h in handlers: | |
h() | |
unduck_music() | |
def unduck_music(): | |
current = get_current_volume() | |
step_size = 2 | |
logging.info(f"unducking from {current} to {volume_before_ducking}...") | |
for i in range(current, volume_before_ducking + step_size, step_size): | |
#kodi.Application.SetVolume({"volume": i}) | |
print(f"unducking {i}") | |
rx.volume = i | |
time.sleep(0.1) | |
client.once('recognizer_loop:wakeword', wakeword_handler) | |
def record_end_handler(message): | |
logging.info('Mycroft "{}"'.format(message.__dict__)) | |
set_delayed_unduck() | |
def main(): | |
load_dotenv() | |
kodi_username = os.getenv('KODI_USERNAME') | |
kodi_password = os.getenv('KODI_PASSWORD') | |
kodi_host = os.getenv('KODI_HOST') | |
logging.basicConfig(format='%(asctime)s - %(levelname)s:%(name)s - %(message)s', level=logging.INFO) | |
logging.info('Setting up client to connect to a local mycroft instance') | |
global client | |
#global kodi | |
client = MessageBusClient() | |
#kodi = Kodi(f"http://{kodi_host}/jsonrpc", kodi_username, kodi_password) | |
global rx | |
rx = get_rx() | |
logging.info(f'Registering handlers ...' ) | |
client.once('recognizer_loop:wakeword', wakeword_handler) | |
client.run_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment