Created
July 20, 2016 23:49
-
-
Save cristianmiranda/89790ddd10cfe3149ca43a06696fb994 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# open a microphone in pyAudio and listen for taps | |
# brew install portaudio | |
# pip install --allow-external pyaudio --allow-unverified pyaudio pyaudio | |
import pyaudio | |
import struct | |
import math | |
import time | |
import requests | |
INITIAL_TAP_THRESHOLD = 0.050 | |
FORMAT = pyaudio.paInt16 | |
SHORT_NORMALIZE = (1.0/32768.0) | |
CHANNELS = 2 | |
RATE = 44100 | |
INPUT_BLOCK_TIME = 0.05 | |
INPUT_FRAMES_PER_BLOCK = int(RATE*INPUT_BLOCK_TIME) | |
# if we get this many noisy blocks in a row, increase the threshold | |
OVERSENSITIVE = 15.0/INPUT_BLOCK_TIME | |
# if we get this many quiet blocks in a row, decrease the threshold | |
UNDERSENSITIVE = 120.0/INPUT_BLOCK_TIME | |
# if the noise was longer than this many blocks, it's not a 'tap' | |
MAX_TAP_BLOCKS = 0.15/INPUT_BLOCK_TIME | |
ALLOWED_DIFF_BETWEEN_CLAPS = 1000 | |
def get_rms( block ): | |
# RMS amplitude is defined as the square root of the | |
# mean over time of the square of the amplitude. | |
# so we need to convert this string of bytes into | |
# a string of 16-bit samples... | |
# we will get one short out for each | |
# two chars in the string. | |
count = len(block)/2 | |
format = "%dh"%(count) | |
shorts = struct.unpack( format, block ) | |
# iterate over the block. | |
sum_squares = 0.0 | |
for sample in shorts: | |
# sample is a signed short in +/- 32768. | |
# normalize it to 1.0 | |
n = sample * SHORT_NORMALIZE | |
sum_squares += n*n | |
return math.sqrt( sum_squares / count ) | |
class TapTester(object): | |
def __init__(self): | |
self.pa = pyaudio.PyAudio() | |
self.stream = self.open_mic_stream() | |
self.tap_threshold = INITIAL_TAP_THRESHOLD | |
self.noisycount = MAX_TAP_BLOCKS+1 | |
self.quietcount = 0 | |
self.errorcount = 0 | |
self.last_timestamp = 0 | |
def stop(self): | |
self.stream.close() | |
def find_input_device(self): | |
device_index = None | |
for i in range( self.pa.get_device_count() ): | |
devinfo = self.pa.get_device_info_by_index(i) | |
print( "Device %d: %s"%(i,devinfo["name"]) ) | |
for keyword in ["mic","input"]: | |
if keyword in devinfo["name"].lower(): | |
print( "Found an input: device %d - %s"%(i,devinfo["name"]) ) | |
device_index = i | |
return device_index | |
if device_index == None: | |
print( "No preferred input found; using default input device." ) | |
return device_index | |
def open_mic_stream( self ): | |
device_index = self.find_input_device() | |
stream = self.pa.open( format = FORMAT, | |
channels = CHANNELS, | |
rate = RATE, | |
input = True, | |
input_device_index = device_index, | |
frames_per_buffer = INPUT_FRAMES_PER_BLOCK) | |
return stream | |
def tapDetected(self): | |
global consecutive_taps | |
millis = int(round(time.time() * 1000)) | |
diff = millis - self.last_timestamp | |
# Reset consecutive timestamp | |
if millis - self.last_timestamp > ALLOWED_DIFF_BETWEEN_CLAPS: | |
self.last_timestamp = 0 | |
if not self.last_timestamp: | |
self.last_timestamp = millis | |
print "Tap!" | |
elif diff <= ALLOWED_DIFF_BETWEEN_CLAPS: | |
print "Consecutive Tap!. Diff %d" % diff | |
self.last_timestamp = 0 | |
self.toggle_light(self.get_light_id()) | |
else: | |
self.last_timestamp = 0 | |
print "Tap!" | |
def listen(self): | |
try: | |
block = self.stream.read(INPUT_FRAMES_PER_BLOCK) | |
except IOError, e: | |
# dammit. | |
self.errorcount += 1 | |
print( "(%d) Error recording: %s"%(self.errorcount,e) ) | |
self.noisycount = 1 | |
time.sleep(2) | |
#raise | |
amplitude = get_rms( block ) | |
if amplitude > self.tap_threshold: | |
# noisy block | |
self.quietcount = 0 | |
self.noisycount += 1 | |
if self.noisycount > OVERSENSITIVE: | |
# turn down the sensitivity | |
self.tap_threshold *= 1.1 | |
else: | |
# quiet block. | |
if 1 <= self.noisycount <= MAX_TAP_BLOCKS: | |
self.tapDetected() | |
self.noisycount = 0 | |
self.quietcount += 1 | |
if self.quietcount > UNDERSENSITIVE: | |
# turn up the sensitivity | |
self.tap_threshold *= 0.9 | |
''' | |
LIFX API methods | |
''' | |
def get_light_id(self): | |
return 'd073d5123254' | |
def get_token(self): | |
return 'cb3d4e8f6494e59f09913a6cb42df876d5623ad46f0653fd063c59029c7a25b5' | |
def _do_lifx_auth_post(self, url, data): | |
try: | |
token = self.get_token() | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': 'Bearer ' + token, | |
} | |
if data: | |
response = requests.post(url, data=data, headers=headers) | |
else: | |
response = requests.post(url, headers=headers) | |
print 'Response: {0}'.format(response) | |
return response | |
except requests.HTTPError as e: | |
print 'Unable to submit post data {url} - {error}'.format(url=url, error=e.reason) | |
raise | |
def toggle_light(self, light_id): | |
print 'Toggle State to light {light_id}.'.format(light_id=light_id) | |
url = 'https://api.lifx.com/v1/lights/id:' + light_id + '/toggle' | |
try: | |
return self._do_lifx_auth_post(url, {}) | |
except: | |
return None | |
if __name__ == "__main__": | |
tt = TapTester() | |
while True: | |
time.sleep(0.05) | |
try: | |
tt.listen() | |
except: | |
print 'Error. Resetting stream...' | |
tt.stop() | |
tt = TapTester() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment