Created
May 9, 2022 19:51
-
-
Save buzztiaan/141ab1dc469b8f7db1d362b273b73746 to your computer and use it in GitHub Desktop.
melan's python iets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Melan, 2020 | |
import paho.mqtt.client as mqtt | |
import os | |
import coloredlogs | |
import logging | |
import json | |
import re | |
import threading | |
import math | |
import traceback | |
import pydub | |
import math | |
from mpd import MPDClient | |
from queue import Queue | |
from threading import Thread | |
from pydub import AudioSegment | |
from pydub.playback import play | |
class themesongs(): | |
config = { | |
"path": "/mnt/mp3/themes", | |
"mqtt": { | |
"host": "arbiter.vm.nurd.space", | |
"port": 1883 | |
}, | |
"mpd": { | |
"host": "localhost", | |
"port": 6600, | |
} | |
} | |
log = logging.getLogger("ThemeSongs") | |
play_queue = Queue() | |
def __init__(self): | |
self.log.info("Theme song server is starting") | |
self.mqtt = mqtt.Client() | |
self.mqtt.connect(self.config['mqtt']['host'], self.config['mqtt']['port'], 60) | |
self.mqtt.on_connect = self.on_mqtt_connect | |
self.mqtt.on_message = self.on_mqtt_message | |
threading.Thread(target=self.threaded_play).start() | |
print(round((math.log(math.sqrt(int("60")/100)) / 2.30258509299405) * 60) ) | |
self.mqtt.loop_forever() | |
def threaded_play(self): | |
self.log.info("Play Thread started.") | |
while True: | |
file = self.play_queue.get() | |
if file and os.path.exists(file): | |
try: | |
mclient = MPDClient() # create client object | |
mclient.timeout = 10 # network timeout in seconds (floats allowed), default: None | |
mclient.idletimeout = None # timeout for fetching the result of the idle command is handled seperately, default: None | |
mclient.connect("slabpi.dhcp.nurd.space", 6600) # connect to localhost:6600 | |
mpd_status = mclient.status() | |
# Only pause MPD if it was actually playing | |
if mpd_status['state'] == "play": | |
mclient.pause(1) | |
volume_change = round((math.log(math.sqrt(int(mpd_status['volume'])/100)) / 2.30258509299405) * 40) | |
self.log.info("Playing file %s" % (file)) | |
sound = pydub.AudioSegment.from_file(file, format=os.path.splitext(os.path.basename(file))[-1][1:]) # Load the audio file | |
self.log.info("Volume: " + str(volume_change)) | |
pydub.playback.play(sound - int(str(volume_change)[1:])) # Play | |
# Only resume MPD if it was previously playing | |
if mpd_status['state'] == "play": | |
mclient.pause(0) | |
mclient.disconnect() | |
except Exception as e: | |
self.log.info('Something failed: %s' % e) | |
traceback.print_exc() | |
def on_mqtt_connect(self, client, userdata, flags, rc): | |
self.log.info("Connected to MQTT") | |
self.mqtt.subscribe("space/door/front") | |
def on_mqtt_message(self, client, userdata, msg): | |
try: | |
message_json = json.loads(msg.payload.decode("utf-8")) | |
except Exception as e: | |
self.log.error("Failed to decode %s (%s)" % (msg, e)) | |
message_json = None | |
if message_json and "name" in message_json: | |
self.log.info("%s has opened the door." % (message_json['name'])) | |
for file in os.listdir(self.config['path']): | |
if re.search(message_json['name'], file, re.IGNORECASE): | |
self.log.info("%s's theme song: %s" % (message_json['name'], file)) | |
self.play_queue.put(os.path.join(self.config['path'], file)) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
coloredlogs.install(level='INFO', fmt="%(asctime)s %(name)s %(levelname)s %(message)s") | |
tsong = themesongs() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment