Skip to content

Instantly share code, notes, and snippets.

@correl
Created August 27, 2020 22:29
Show Gist options
  • Save correl/a4359439f4f58e8a4f96620eff1185b9 to your computer and use it in GitHub Desktop.
Save correl/a4359439f4f58e8a4f96620eff1185b9 to your computer and use it in GitHub Desktop.
Turntable record detection using audio fingerprinting via PyDejavu
git+https://github.com/worldveil/dejavu.git#egg=PyDejavu
pyalsaaudio
requests
{
"input_device": "dsnoop:1,0",
"dejavu": {
"database": {
"host": "localhost",
"user": "dejavu",
"password": "dejavu",
"database": "dejavu"
},
"database_type": "postgres"
},
"icecast": {
"host": "localhost",
"port": 8000,
"mountpoint": "turntable.mp3",
"admin_user": "admin",
"admin_password": "secret"
}
}
#!/usr/bin/env python3
import audioop
from collections import deque
from itertools import chain
import json
import math
import os
import struct
import sys
import threading
import time
from typing import Any, Deque, Dict, Iterable, List, Optional, Tuple
from urllib.parse import urlencode
import wave
import alsaaudio # type: ignore
from dejavu import Dejavu # type: ignore
from dejavu.base_classes.base_recognizer import BaseRecognizer # type: ignore
from dejavu.config import settings # type: ignore
import requests
SAMPLE_SECONDS = 60
FINGERPRINT_STORE_SECONDS = 30
FINGERPRINT_IDENTIFY_SECONDS = 5
SILENCE_THRESHOLD = 100
FINGERPRINT_DELAY = 5
CHECK_INTERVAL = 0.5
class Sampler(threading.Thread):
def __init__(self, device: str) -> None:
super().__init__()
self.period_size: int = 1024
self.capture = alsaaudio.PCM(
device=device,
type=alsaaudio.PCM_CAPTURE,
format=alsaaudio.PCM_FORMAT_S16_LE,
periodsize=self.period_size,
)
self.framerate: int = self.capture.getrates()
self.channels: int = self.capture.getchannels()[0]
self.stream_lock = threading.Lock()
chunks = self.seconds_to_chunks(SAMPLE_SECONDS)
self.stream_fifo: Deque[bytes] = deque([], maxlen=chunks)
print(
"Sampler started [device='%s', r=%d, c=%d, p=%d, s=%d]"
% (device, self.framerate, self.channels, self.period_size, chunks)
)
def get_chunks(self) -> List[bytes]:
with self.stream_lock:
return list(self.stream_fifo)
def seconds_to_chunks(self, seconds: float) -> int:
return math.ceil(self.framerate // self.period_size * seconds)
def run(self) -> None:
while True:
length, data = self.capture.read()
if length > 0:
with self.stream_lock:
self.period_size = length
self.stream_fifo.append(data)
else:
print(
"Sampler error (length={}, bytes={})".format(length, len(data)),
file=sys.stderr,
)
class ChunkRecognizer(BaseRecognizer):
def chunks_to_channel_data(
self, chunks: List[bytes], channels: int
) -> List[List[int]]:
def chunk_to_ints(data: bytes) -> Iterable[int]:
return struct.unpack("{}h".format(len(data) // 2), data)
stream = list(chain(*map(chunk_to_ints, chunks)))
return [stream[channel::channels] for channel in range(channels)]
def recognize(self, chunks: List[bytes], channels: int) -> Dict[str, Any]:
data = self.chunks_to_channel_data(chunks, channels)
t = time.time()
matches, fingerprint_time, query_time, align_time = self._recognize(*data)
t = time.time() - t
return {
settings.TOTAL_TIME: t,
settings.FINGERPRINT_TIME: fingerprint_time,
settings.QUERY_TIME: query_time,
settings.ALIGN_TIME: align_time,
settings.RESULTS: matches,
}
class Turntable:
def __init__(self, config: Dict[str, Any]) -> None:
self.config = config
self.state = "idle"
self.sampler = Sampler(config["input_device"])
self.title: Optional[str] = None
self.fingerprinter: Optional[threading.Timer] = None
self.fingerprint_audio: Optional[List[bytes]] = None
self.dejavu = Dejavu(config["dejavu"])
self.recognizer = ChunkRecognizer(self.dejavu)
self.set_title(None)
def start(self) -> None:
print("Turntable has started playing")
self.state = "playing"
self.set_title("<Record starting...>")
self.fingerprinter = threading.Timer(
FINGERPRINT_DELAY + FINGERPRINT_STORE_SECONDS,
self._fingerprint,
)
self.fingerprinter.start()
def stop(self) -> None:
print("Turntable has stopped")
if self.fingerprinter:
self.fingerprinter.cancel()
self.fingerprint_audio = None
self.set_title(None)
self.state = "idle"
def set_title(self, title: Optional[str] = None) -> None:
self.title = title
display = title or "<Idle>"
print(f"Title: '{display}'")
icecast = config.get("icecast", dict())
host = icecast.get("host", "localhost")
port = icecast.get("port", 8000)
mountpoint = icecast.get("mountpoint", "")
admin_user = icecast.get("admin_user", "")
admin_pass = icecast.get("admin_password", "")
requests.get(
f"http://{host}:{port}/admin/metadata",
params={
"mount": os.path.join("/", mountpoint),
"mode": "updinfo",
"song": display,
},
auth=(admin_user, admin_pass),
)
def run(self) -> None:
self.sampler.start()
while True:
chunks = self.get_chunks(CHECK_INTERVAL)
self._update(chunks)
time.sleep(CHECK_INTERVAL)
def get_chunks(self, max_seconds: Optional[float] = None) -> List[bytes]:
chunks = self.sampler.get_chunks()
if max_seconds:
num_chunks = self.sampler.seconds_to_chunks(max_seconds)
return chunks[-num_chunks:]
else:
return chunks
def _update(self, chunks: List[bytes]) -> None:
fragment = b"".join(chunks)
maximum = audioop.max(fragment, 2)
if self.state == "idle" and maximum > SILENCE_THRESHOLD:
self.start()
elif self.state == "playing" and maximum <= SILENCE_THRESHOLD:
self.stop()
def _fingerprint(self) -> None:
chunks = self.get_chunks(FINGERPRINT_STORE_SECONDS)
self.fingerprint_audio = chunks
with wave.open("/tmp/fingerprint.wav", "wb") as wavfile:
wavfile.setsampwidth(2)
wavfile.setnchannels(self.sampler.channels)
wavfile.setframerate(self.sampler.framerate)
wavfile.writeframesraw(b"".join(chunks))
print("Collected waveform for fingerprinting")
identify_chunks = self.sampler.seconds_to_chunks(FINGERPRINT_IDENTIFY_SECONDS)
identification = self.recognizer.recognize(
chunks[-identify_chunks:],
self.sampler.channels,
)
print(identification)
if results := identification[settings.RESULTS]:
self.set_title(results[0][settings.SONG_NAME])
else:
self.set_title("Unknown Artist - Unknown Album (Side X)")
if __name__ == "__main__":
config_filename = os.path.expanduser("~/.config/turntable.json")
with open(config_filename, "r") as config_file:
config: Dict[str, Any] = json.load(config_file)
turntable = Turntable(config)
turntable.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment