Last active
December 21, 2015 19:19
-
-
Save sturlamolden/6353279 to your computer and use it in GitHub Desktop.
Example of audioprocessing in Windows with Python 2.7 using NumPy and pywin32 (Microsoft DirectSound)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Copyright (C) 2013 Sturla Molden """ | |
import threading | |
import Queue | |
import pywintypes | |
import win32event | |
import win32com.directsound.directsound as directsound | |
import numpy as np | |
class BufferDescriptor(object): | |
""" Buffer descriptor for directsound """ | |
def __init__(self, milliseconds=100): | |
wfxFormat = pywintypes.WAVEFORMATEX() | |
wfxFormat.wFormatTag = pywintypes.WAVE_FORMAT_PCM | |
wfxFormat.nChannels = 2 | |
wfxFormat.nSamplesPerSec = 44100 | |
wfxFormat.nAvgBytesPerSec = 176400 | |
wfxFormat.nBlockAlign = 4 | |
wfxFormat.wBitsPerSample = 16 | |
self.format = wfxFormat | |
self.size = 4*int((44100*milliseconds)/1000.) | |
self.milliseconds = milliseconds | |
self.shape = (self.size//4, 2) | |
self.dtype = np.int16 | |
self.Fs = 44100 | |
class DirectSoundPlayer(threading.Thread): | |
""" A thread that writes spike waveforms to the speaker""" | |
def __init__(self, descriptor, queue, abort): | |
""" | |
descriptor: BufferDescriptor | |
queue: Queue.Queue with playback buffers as np.ndarray | |
abort: threading.Event as abourt signal | |
""" | |
threading.Thread.__init__(self) | |
d = directsound.DirectSoundCreate(None, None) | |
d.SetCooperativeLevel(None, directsound.DSSCL_PRIORITY) | |
sdesc = directsound.DSBUFFERDESC() | |
sdesc.dwFlags = directsound.DSBCAPS_STICKYFOCUS | directsound.DSBCAPS_CTRLPOSITIONNOTIFY | |
sdesc.dwBufferBytes = descriptor.size | |
sdesc.lpwfxFormat = descriptor.format | |
_buffer = d.CreateSoundBuffer(sdesc, None) | |
event = win32event.CreateEvent(None, 0, 0, None) | |
notify = _buffer.QueryInterface(directsound.IID_IDirectSoundNotify) | |
notify.SetNotificationPositions((directsound.DSBPN_OFFSETSTOP, event)) | |
self.device = d | |
self.sdesc = sdesc | |
self.descriptor = descriptor | |
self.buffer = _buffer | |
self.event = event | |
self.notify = notify | |
self.abort = abort | |
self.queue = queue | |
self.timeout = 0.5 * descriptor.milliseconds / 1000. | |
def run(self): | |
self.array = None | |
while not self.abort.isSet(): | |
try: | |
data = self.queue.get(block=True, timeout=self.timeout) | |
#print data.shape, data.dtype | |
self.buffer.Update(0, data.tostring()) # data is a numpy array | |
self.buffer.Play(0) | |
win32event.WaitForSingleObject(self.event, -1) | |
except Queue.Empty: | |
# nothing to play | |
# print 'empty' | |
pass | |
self.array = data | |
class DirectSoundRecorder(threading.Thread): | |
""" A thread that continously reads chunks of data from | |
line-in or the microphone. """ | |
def __init__(self, descriptor, queue, abort): | |
""" | |
descriptor: BufferDescriptor | |
queue: Queue.Queue for writing data as np.ndarray | |
abort: threading.Event as abourt signal | |
""" | |
threading.Thread.__init__(self) | |
d = directsound.DirectSoundCaptureCreate(None, None) | |
sdesc = directsound.DSCBUFFERDESC() | |
sdesc.dwBufferBytes = descriptor.size | |
sdesc.lpwfxFormat = descriptor.format | |
_buffer = d.CreateCaptureBuffer(sdesc) | |
event = win32event.CreateEvent(None, 0, 0, None) | |
notify = _buffer.QueryInterface(directsound.IID_IDirectSoundNotify) | |
notify.SetNotificationPositions((directsound.DSBPN_OFFSETSTOP, event)) | |
self.device = d | |
self.sdesc = sdesc | |
self.descriptor = descriptor | |
self.buffer = _buffer | |
self.event = event | |
self.notify = notify | |
self.abort = abort | |
self.queue = queue | |
self.timeout = 2 * descriptor.milliseconds / 1000. | |
def run(self): | |
self.array = None | |
while not self.abort.isSet(): | |
self.buffer.Start(0) | |
win32event.WaitForSingleObject(self.event, -1) | |
data = self.buffer.Update(0, self.descriptor.size) | |
try: | |
array = np.frombuffer(data, dtype=self.descriptor.dtype).reshape(self.descriptor.shape) | |
self.queue.put( array.copy(), block=True, timeout=self.timeout ) | |
except Queue.Full: | |
# we might loose data on full play-back queue | |
# print 'full' | |
pass | |
self.array = array | |
if __name__ == '__main__': | |
from time import sleep | |
desc = BufferDescriptor(milliseconds=1000) | |
queue = Queue.Queue(maxsize=100) | |
abort_event = threading.Event() | |
recorder = DirectSoundRecorder(desc, queue, abort_event) | |
player = DirectSoundPlayer(desc, queue, abort_event) | |
print "recording" | |
abort_event.clear() | |
recorder.start() | |
sleep(1) | |
print "delayed playback" | |
abort_event.clear() | |
player.start() | |
sleep(5) | |
abort_event.set() | |
player.join() | |
recorder.join() | |
import matplotlib | |
import matplotlib.pyplot as plt | |
plt.figure() | |
plt.plot(recorder.array[:,0],'b') | |
plt.plot(recorder.array[:,1],'r') | |
plt.show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment