Created
August 27, 2020 22:29
-
-
Save correl/a4359439f4f58e8a4f96620eff1185b9 to your computer and use it in GitHub Desktop.
Turntable record detection using audio fingerprinting via PyDejavu
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
git+https://github.com/worldveil/dejavu.git#egg=PyDejavu | |
pyalsaaudio | |
requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"input_device": "dsnoop:1,0", | |
"dejavu": { | |
"database": { | |
"host": "localhost", | |
"user": "dejavu", | |
"password": "dejavu", | |
"database": "dejavu" | |
}, | |
"database_type": "postgres" | |
}, | |
"icecast": { | |
"host": "localhost", | |
"port": 8000, | |
"mountpoint": "turntable.mp3", | |
"admin_user": "admin", | |
"admin_password": "secret" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import audioop | |
from collections import deque | |
from itertools import chain | |
import json | |
import math | |
import os | |
import struct | |
import sys | |
import threading | |
import time | |
from typing import Any, Deque, Dict, Iterable, List, Optional, Tuple | |
from urllib.parse import urlencode | |
import wave | |
import alsaaudio # type: ignore | |
from dejavu import Dejavu # type: ignore | |
from dejavu.base_classes.base_recognizer import BaseRecognizer # type: ignore | |
from dejavu.config import settings # type: ignore | |
import requests | |
SAMPLE_SECONDS = 60 | |
FINGERPRINT_STORE_SECONDS = 30 | |
FINGERPRINT_IDENTIFY_SECONDS = 5 | |
SILENCE_THRESHOLD = 100 | |
FINGERPRINT_DELAY = 5 | |
CHECK_INTERVAL = 0.5 | |
class Sampler(threading.Thread): | |
def __init__(self, device: str) -> None: | |
super().__init__() | |
self.period_size: int = 1024 | |
self.capture = alsaaudio.PCM( | |
device=device, | |
type=alsaaudio.PCM_CAPTURE, | |
format=alsaaudio.PCM_FORMAT_S16_LE, | |
periodsize=self.period_size, | |
) | |
self.framerate: int = self.capture.getrates() | |
self.channels: int = self.capture.getchannels()[0] | |
self.stream_lock = threading.Lock() | |
chunks = self.seconds_to_chunks(SAMPLE_SECONDS) | |
self.stream_fifo: Deque[bytes] = deque([], maxlen=chunks) | |
print( | |
"Sampler started [device='%s', r=%d, c=%d, p=%d, s=%d]" | |
% (device, self.framerate, self.channels, self.period_size, chunks) | |
) | |
def get_chunks(self) -> List[bytes]: | |
with self.stream_lock: | |
return list(self.stream_fifo) | |
def seconds_to_chunks(self, seconds: float) -> int: | |
return math.ceil(self.framerate // self.period_size * seconds) | |
def run(self) -> None: | |
while True: | |
length, data = self.capture.read() | |
if length > 0: | |
with self.stream_lock: | |
self.period_size = length | |
self.stream_fifo.append(data) | |
else: | |
print( | |
"Sampler error (length={}, bytes={})".format(length, len(data)), | |
file=sys.stderr, | |
) | |
class ChunkRecognizer(BaseRecognizer): | |
def chunks_to_channel_data( | |
self, chunks: List[bytes], channels: int | |
) -> List[List[int]]: | |
def chunk_to_ints(data: bytes) -> Iterable[int]: | |
return struct.unpack("{}h".format(len(data) // 2), data) | |
stream = list(chain(*map(chunk_to_ints, chunks))) | |
return [stream[channel::channels] for channel in range(channels)] | |
def recognize(self, chunks: List[bytes], channels: int) -> Dict[str, Any]: | |
data = self.chunks_to_channel_data(chunks, channels) | |
t = time.time() | |
matches, fingerprint_time, query_time, align_time = self._recognize(*data) | |
t = time.time() - t | |
return { | |
settings.TOTAL_TIME: t, | |
settings.FINGERPRINT_TIME: fingerprint_time, | |
settings.QUERY_TIME: query_time, | |
settings.ALIGN_TIME: align_time, | |
settings.RESULTS: matches, | |
} | |
class Turntable: | |
def __init__(self, config: Dict[str, Any]) -> None: | |
self.config = config | |
self.state = "idle" | |
self.sampler = Sampler(config["input_device"]) | |
self.title: Optional[str] = None | |
self.fingerprinter: Optional[threading.Timer] = None | |
self.fingerprint_audio: Optional[List[bytes]] = None | |
self.dejavu = Dejavu(config["dejavu"]) | |
self.recognizer = ChunkRecognizer(self.dejavu) | |
self.set_title(None) | |
def start(self) -> None: | |
print("Turntable has started playing") | |
self.state = "playing" | |
self.set_title("<Record starting...>") | |
self.fingerprinter = threading.Timer( | |
FINGERPRINT_DELAY + FINGERPRINT_STORE_SECONDS, | |
self._fingerprint, | |
) | |
self.fingerprinter.start() | |
def stop(self) -> None: | |
print("Turntable has stopped") | |
if self.fingerprinter: | |
self.fingerprinter.cancel() | |
self.fingerprint_audio = None | |
self.set_title(None) | |
self.state = "idle" | |
def set_title(self, title: Optional[str] = None) -> None: | |
self.title = title | |
display = title or "<Idle>" | |
print(f"Title: '{display}'") | |
icecast = config.get("icecast", dict()) | |
host = icecast.get("host", "localhost") | |
port = icecast.get("port", 8000) | |
mountpoint = icecast.get("mountpoint", "") | |
admin_user = icecast.get("admin_user", "") | |
admin_pass = icecast.get("admin_password", "") | |
requests.get( | |
f"http://{host}:{port}/admin/metadata", | |
params={ | |
"mount": os.path.join("/", mountpoint), | |
"mode": "updinfo", | |
"song": display, | |
}, | |
auth=(admin_user, admin_pass), | |
) | |
def run(self) -> None: | |
self.sampler.start() | |
while True: | |
chunks = self.get_chunks(CHECK_INTERVAL) | |
self._update(chunks) | |
time.sleep(CHECK_INTERVAL) | |
def get_chunks(self, max_seconds: Optional[float] = None) -> List[bytes]: | |
chunks = self.sampler.get_chunks() | |
if max_seconds: | |
num_chunks = self.sampler.seconds_to_chunks(max_seconds) | |
return chunks[-num_chunks:] | |
else: | |
return chunks | |
def _update(self, chunks: List[bytes]) -> None: | |
fragment = b"".join(chunks) | |
maximum = audioop.max(fragment, 2) | |
if self.state == "idle" and maximum > SILENCE_THRESHOLD: | |
self.start() | |
elif self.state == "playing" and maximum <= SILENCE_THRESHOLD: | |
self.stop() | |
def _fingerprint(self) -> None: | |
chunks = self.get_chunks(FINGERPRINT_STORE_SECONDS) | |
self.fingerprint_audio = chunks | |
with wave.open("/tmp/fingerprint.wav", "wb") as wavfile: | |
wavfile.setsampwidth(2) | |
wavfile.setnchannels(self.sampler.channels) | |
wavfile.setframerate(self.sampler.framerate) | |
wavfile.writeframesraw(b"".join(chunks)) | |
print("Collected waveform for fingerprinting") | |
identify_chunks = self.sampler.seconds_to_chunks(FINGERPRINT_IDENTIFY_SECONDS) | |
identification = self.recognizer.recognize( | |
chunks[-identify_chunks:], | |
self.sampler.channels, | |
) | |
print(identification) | |
if results := identification[settings.RESULTS]: | |
self.set_title(results[0][settings.SONG_NAME]) | |
else: | |
self.set_title("Unknown Artist - Unknown Album (Side X)") | |
if __name__ == "__main__": | |
config_filename = os.path.expanduser("~/.config/turntable.json") | |
with open(config_filename, "r") as config_file: | |
config: Dict[str, Any] = json.load(config_file) | |
turntable = Turntable(config) | |
turntable.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment