Skip to content

Instantly share code, notes, and snippets.

@sirk390
Last active November 6, 2024 14:05
Show Gist options
  • Save sirk390/516b3b181ee3d4855a51cd4b5d9262aa to your computer and use it in GitHub Desktop.
Save sirk390/516b3b181ee3d4855a51cd4b5d9262aa to your computer and use it in GitHub Desktop.
AsyncAudio
import asyncio
import numpy as np
import sounddevice as sd
class AsyncAudio:
def __init__(self, chunk=1024, channels=1, rate=24000, format='int16'):
self.chunk = chunk
self.channels = channels
self.rate = rate
self.format = format
self.stream_play = None
self.stream_record = None
async def record(self):
if not self.stream_record:
self.stream_record = sd.InputStream(
channels=self.channels,
samplerate=self.rate,
dtype=self.format,
blocksize=self.chunk
)
self.stream_record.start()
loop = asyncio.get_running_loop()
try:
data = await loop.run_in_executor(None, lambda: self.stream_record.read(self.chunk)[0]
)
return data.tobytes()
except asyncio.CancelledError:
self.stream_record.stop()
self.stream_record.close()
self.stream_record = None
async def play(self, audio_data):
if not self.stream_play:
self.stream_play = sd.OutputStream(
channels=self.channels,
samplerate=self.rate,
dtype=self.format,
blocksize=self.chunk
)
self.stream_play.start()
loop = asyncio.get_running_loop()
try:
data = np.frombuffer(audio_data, dtype=np.int16)
await loop.run_in_executor(None, self.stream_play.write, data)
except asyncio.CancelledError:
self.stream_play.stop()
self.stream_play.close()
self.stream_play = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment