|
#!/usr/bin/env python |
|
# encoding: utf-8 |
|
|
|
from __future__ import unicode_literals, print_function |
|
|
|
import sys |
|
import os |
|
import atexit |
|
import time |
|
|
|
import numpy as np |
|
import pyaudio |
|
|
|
|
|
class Spectrum(object): |
|
|
|
FORMAT = pyaudio.paFloat32 |
|
CHANNELS = 1 |
|
RATE = 16000 |
|
FRAME_LEN = 512 |
|
DEVICE_NAME = 'USB Audio' |
|
PATTERNS = ' .,:;|#@89' |
|
OUTPUT_WIDTH = 200 |
|
|
|
def __init__(self): |
|
self.pa = pyaudio.PyAudio() |
|
self.last_samples = None |
|
atexit.register(self.pa.terminate) |
|
self.input_device_index = None |
|
for i in xrange(self.pa.get_device_count()): |
|
dev = self.pa.get_device_info_by_index(i) |
|
if self.DEVICE_NAME in dev.get('name','') and \ |
|
dev.get('maxInputChannels',0) > 0: |
|
self.input_device_index = i |
|
print('using %r' % dev) |
|
self.win = np.hamming(self.FRAME_LEN) |
|
|
|
def fft(self, samples): |
|
res = np.fft.fft(self.win * samples) |
|
res = res[:len(res)/2] |
|
res = np.log(np.abs(res)**2) |
|
res += 10 |
|
return res[:self.OUTPUT_WIDTH] |
|
|
|
def cepstrum(self, samples): |
|
res = np.fft.fft(self.win * samples) |
|
res = np.log(np.abs(res)**2) |
|
res = np.fft.ifft(res) |
|
res = res[:len(res)/2] |
|
res = np.real(res) |
|
p = 70 |
|
res = res[p:p+self.OUTPUT_WIDTH] |
|
res *= 30.0 |
|
return res |
|
|
|
def valueToChar(self, value): |
|
#return "%.1f " % value |
|
try: |
|
return self.PATTERNS[min(9, max(0, int(value)))] |
|
except ValueError, OverflowError: |
|
return ' ' |
|
|
|
def callback(self, in_data, frame_count, time_info, status): |
|
data = np.fromstring(in_data, np.float32) |
|
if len(data) != self.FRAME_LEN: |
|
return |
|
d = self.fft(data) |
|
print(''.join([self.valueToChar(v) for v in d])) |
|
return (in_data, self.recording) |
|
|
|
def record(self): |
|
self.recording = pyaudio.paContinue |
|
stream = self.pa.open(format = self.FORMAT, |
|
channels = self.CHANNELS, |
|
rate = self.RATE, |
|
input = True, |
|
input_device_index = self.input_device_index, |
|
output = False, |
|
frames_per_buffer = self.FRAME_LEN, |
|
stream_callback = self.callback) |
|
stream.start_stream() |
|
|
|
while stream.is_active(): |
|
try: |
|
time.sleep(1) |
|
except KeyboardInterrupt: |
|
self.recording = pyaudio.paAbort |
|
|
|
stream.start_stream() |
|
stream.close() |
|
|
|
if __name__ == '__main__': |
|
spe = Spectrum() |
|
spe.record() |