Skip to content

Instantly share code, notes, and snippets.

@nvbn
Last active March 11, 2023 13:16
Show Gist options
  • Save nvbn/73b613849cb176ec33057236b2fd4558 to your computer and use it in GitHub Desktop.
Save nvbn/73b613849cb176ec33057236b2fd4558 to your computer and use it in GitHub Desktop.
Sound lights with spotify, esp8266 and neopixel strip
from __future__ import annotations
import array
import asyncio
from bisect import bisect_left
from dataclasses import dataclass
import logging
import os
import socket
import time
from typing import Union, Dict, Any, NoReturn, AsyncIterable, List, Tuple, Callable
import aiohttp
from spotipy.util import prompt_for_user_token
# Shared communication
RawSpotifyResponse = Dict[str, Any]
@dataclass
class EventSongChanged:
analysis: RawSpotifyResponse
start_time: float
@dataclass
class EventAdjustStartTime:
start_time: float
@dataclass
class EventStop:
...
Event = Union[EventSongChanged, EventAdjustStartTime, EventStop]
Colors = List[Tuple[int, int, int]]
# Event producer
API_CURRENT_PLAYING = 'https://api.spotify.com/v1/me/player/currently-playing'
API_AUDIO_ANALYSIS = 'https://api.spotify.com/v1/audio-analysis/'
SPOTIFY_SCOPE = 'user-read-currently-playing,user-read-playback-state'
SPOTIFY_CHANGES_LISTENER_DEALY = 1
SPOTIFY_CHANGES_LISTENER_FAILURE_DELAY = 1
async def _get_current_playing(session: aiohttp.ClientSession) -> RawSpotifyResponse:
async with session.get(API_CURRENT_PLAYING) as response:
return await response.json()
async def _get_audio_analysis(session: aiohttp.ClientSession, id: str) -> RawSpotifyResponse:
async with session.get(API_AUDIO_ANALYSIS + id) as response:
return await response.json()
def _get_start_time(current_playing: RawSpotifyResponse, request_time: float) -> float:
# spotify timestamp appears to be incorrect https://github.com/spotify/web-api/issues/640
return (request_time + time.time()) / 2 - current_playing['progress_ms'] / 1000
async def _listen_to_spotify_changes(session: aiohttp.ClientSession) -> AsyncIterable[Event]:
current_id = None
while True:
request_time = time.time()
current = await _get_current_playing(session)
if not current['is_playing']:
current_id = None
yield EventStop()
elif current['item']['id'] != current_id:
current_id = current['item']['id']
analysis = await _get_audio_analysis(session, current_id)
yield EventSongChanged(analysis, _get_start_time(current, request_time))
else:
yield EventAdjustStartTime(_get_start_time(current, request_time))
await asyncio.sleep(SPOTIFY_CHANGES_LISTENER_DEALY)
async def spotify_changes_listener(user_id: str,
client_id: str,
client_secret: str,
events_queue: asyncio.Queue[Event]) -> NoReturn:
while True:
# I'm too lazy to make that async
token = prompt_for_user_token(user_id, SPOTIFY_SCOPE,
client_id=client_id,
client_secret=client_secret,
redirect_uri='http://localhost:8000/')
headers = {'Authorization': f'Bearer {token}'}
async with aiohttp.ClientSession(headers=headers) as session:
try:
async for event in _listen_to_spotify_changes(session):
await events_queue.put(event)
except Exception:
logging.exception('Something went wrong with spotify_changes_listener')
await asyncio.sleep(SPOTIFY_CHANGES_LISTENER_FAILURE_DELAY)
# Communication with the device
class DeviceBus:
def connection_made(self, _):
logging.info('Connected')
def error_received(self, exc):
logging.exception('Error received', exc_info=exc)
def connection_lost(self, exc):
logging.exception('Connection closed', exc_info=exc)
async def make_send_to_device(ip: str, port: int) -> Callable[[Colors]]:
loop = asyncio.get_event_loop()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
transport, _ = await loop.create_datagram_endpoint(DeviceBus, sock=sock)
def send_to_device(colors: Colors) -> None:
line = array.array('B', [channel for color in colors
for channel in color])
transport.sendto(line.tobytes(), (ip, port))
return send_to_device
# Light collors selector, spooky, more details in the notebook
SCALE = (50, 100, 150)
BASE_COLOR_MULTIPLIER = 100
LOUDNESS_MULTIPLIER = 1.5
def _normalize(pv: float) -> float:
if pv < 0:
return 0.
elif pv > 255:
return 255.
else:
return pv
_scale_pixel = lambda p: (int(_normalize(p[0]) * SCALE[0] / 255),
int(_normalize(p[1]) * SCALE[1] / 255),
int(_normalize(p[2]) * SCALE[2] / 255))
def make_get_current_colors(analysis: RawSpotifyResponse, leds: int) -> Callable[[float], Colors]:
def make_get_current(name):
keys = [x['start'] for x in analysis[name]]
key_to_x = {x['start']: x for x in analysis[name]}
return lambda t: key_to_x[keys[bisect_left(keys, t) - 1]]
get_current_segmnet = make_get_current('segments')
get_current_section = make_get_current('sections')
get_current_beat = make_get_current('beats')
def make_scale(name):
xs = [x[name] for x in analysis['sections']]
min_xs = min(xs)
max_xs = max(xs)
return lambda x: (x - min_xs) / (max_xs - min_xs)
scale_loudness = make_scale('loudness')
scale_tempo = make_scale('tempo')
def get_current_colors(t):
segment = get_current_segmnet(t)
section = get_current_section(t)
beat = get_current_beat(t)
beat_color = BASE_COLOR_MULTIPLIER * (t - beat['start'] + beat['duration']) / beat['duration']
tempo_color = BASE_COLOR_MULTIPLIER * scale_tempo(section['tempo'])
pitch_colors = [BASE_COLOR_MULTIPLIER * p for p in segment['pitches']]
loudness_multiplier = 1 + LOUDNESS_MULTIPLIER * scale_loudness(section['loudness'])
colors = ((beat_color * loudness_multiplier,
tempo_color * loudness_multiplier,
pitch_colors[n // (leds // 12)] * loudness_multiplier)
for n in range(leds))
if section['mode'] == 0:
order = (0, 1, 2)
elif section['mode'] == 1:
order = (1, 2, 0)
else:
order = (2, 0, 1)
ordered_colors = ((color[order[0]], color[order[1]], color[order[2]])
for color in colors)
return [_scale_pixel(color) for color in ordered_colors]
return get_current_colors
def get_empty_colors(leds: int) -> Colors:
return [(0,) * 3] * leds
# Events listener, device controller
CONTROLLER_TICK = 0.01
CONTROLLER_ERROR_DELAY = 1
async def _events_to_colors(leds: int, events_queue: asyncio.Queue[Event]) -> AsyncIterable[Colors]:
get_current_colors = None
start_time = 0
event = EventStop()
while True:
await asyncio.sleep(CONTROLLER_TICK)
while not events_queue.empty():
event = events_queue.get_nowait()
if isinstance(event, EventSongChanged):
start_time = event.start_time
get_current_colors = make_get_current_colors(event.analysis, leds)
elif isinstance(event, EventAdjustStartTime):
start_time = event.start_time
elif isinstance(event, EventStop):
get_current_colors = None
if get_current_colors is None:
yield get_empty_colors(leds)
else:
yield get_current_colors(time.time() - start_time)
async def lights_controller(device_ip: str,
device_port: int,
leds: int,
events_queue: asyncio.Queue[Event]) -> NoReturn:
while True:
send_to_device = await make_send_to_device(device_ip, device_port)
try:
async for colors in _events_to_colors(leds, events_queue):
send_to_device(colors)
except Exception:
logging.exception("Something went wrong with lights_controller")
await asyncio.sleep(CONTROLLER_ERROR_DELAY)
# Glue
def main():
device_ip = os.environ['DEVICE_IP']
device_port = int(os.environ['DEVICE_PORT'])
user_id = os.environ['USER_ID']
client_id = os.environ['CLIENT_ID']
client_secret = os.environ['CLIENT_SECRET']
leds = int(os.environ['LEDS'])
events_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
spotify_changes_listener(user_id, client_id, client_secret, events_queue),
lights_controller(device_ip, device_port, leds, events_queue),
))
if __name__ == '__main__':
main()
import socket
import machine
import neopixel
np = neopixel.NeoPixel(machine.Pin(5), 60)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', 42424))
while True:
line, _ = sock.recvfrom(180)
if len(line) < 180:
continue
for i in range(60):
np[i] = (line[i * 3], line[i * 3 + 1], line[i * 3 + 2])
np.write()
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
#!/bin/sh
# From https://developer.spotify.com/dashboard/applications
export USER_ID=''
export CLIENT_ID=''
export CLIENT_SECRET=''
export DEVICE_IP=''
export DEVICE_PORT='42424'
export LEDS=60
python researched_refactored.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment