Created
October 28, 2011 13:57
-
-
Save mohayonao/1322328 to your computer and use it in GitHub Desktop.
iPhone - WebSocket - Python で音を鳴らす
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import math, array, random | |
import threading | |
import pyaudio | |
from gevent import pywsgi | |
from geventwebsocket.handler import WebSocketHandler | |
PI2 = math.pi * 2 | |
SAMPLERATE = 22050 | |
TABLE_LENGTH = 1024 | |
STREAM_LENGTH = 32 | |
SINE, TRI, SAW, SQUARE, NOISE = range(5) | |
class SoundSystem(threading.Thread): | |
def __init__(self, channel=2): | |
threading.Thread.__init__(self) | |
self.setDaemon(True) | |
self._gens = set() | |
if channel == 2: | |
def _2ch(): | |
stream = array.array('f', [0] * STREAM_LENGTH * 2) | |
for gen in self._gens: | |
cell = gen.next() | |
L = 2 * (gen._amp * (1.0 - gen._pan)) | |
R = 2 * (gen._amp * gen._pan) | |
for i in xrange(STREAM_LENGTH): | |
stream[i*2 ] += cell[i] * L | |
stream[i*2+1] += cell[i] * R | |
return stream | |
self.__iter__func = _2ch | |
else: | |
channel = 1 | |
def _1ch(): | |
stream = array.array('f', [0] * STREAM_LENGTH) | |
for gen in self._gens: | |
cell = gen.next() | |
amp = gen._amp | |
for i in xrange(STREAM_LENGTH): | |
stream[i] += cell[i] * amp | |
return stream | |
self.__iter__func = _1ch | |
p = pyaudio.PyAudio() | |
self.stream = p.open(rate=SAMPLERATE, channels=channel, | |
format=pyaudio.paFloat32, output=True) | |
def add(self, gen): | |
if isinstance(gen, ToneGenerator): | |
self._gens.add(gen) | |
def remove(self, gen): | |
self._gens.discard(gen) | |
def __iter__(self): | |
while True: yield self.next() | |
def next(self): | |
return self.__iter__func() | |
def run(self): | |
for stream in self: | |
for i in xrange(STREAM_LENGTH): | |
if stream[i] < -1.0 : stream[i] = -1.0 | |
elif 1.0 < stream[i]: stream[i] = +1.0 | |
self.stream.write(array.array('f', stream).tostring()) | |
class ToneGenerator: | |
def __cls_waves(): | |
sine = lambda x: math.sin(PI2 * x) | |
tri = lambda x: 1 - 4 * abs(round(x) - x) | |
saw = lambda x: 2.0 * (x - round(x)) | |
square = lambda x: +1.0 if x < 0.5 else -1.0 | |
noise = lambda x: random.uniform(-1.0, 1.0) | |
return tuple([tuple(f(float(i)/TABLE_LENGTH) | |
for i in xrange(TABLE_LENGTH)) | |
for f in sine, tri, saw, square, noise]) | |
def __cls_steps(length, resolution): | |
center = length >> 1 | |
def calcStep(i): | |
freq = 440.0 * ((2.0**(1.0/(12*resolution)))**(i-center)) | |
return TABLE_LENGTH * freq / SAMPLERATE | |
return tuple(calcStep(i) for i in xrange(length)) | |
_WAVES = __cls_waves() | |
_STEPS = __cls_steps(8192, 128) | |
def __init__(self, wave=SINE, tone=None, freq=None, amp=None, pan=None): | |
self._amp = 0.8 | |
self._pan = 0.5 | |
self._wave = None | |
self._index = 0 | |
self._step = 0 | |
self.chwave(wave) | |
if tone is not None: self.chtone(tone) | |
if freq is not None: self.chfreq(freq) | |
if amp is not None: self.champ(amp) | |
if pan is not None: self.chpan(pan) | |
if wave == NOISE and self._step == 0: self._step = 1 | |
def champ(self, amp): | |
self._amp = max(0.0, min(1.0, amp)) | |
def chpan(self, amp): | |
self._pan = max(0.0, min(1.0, amp)) | |
def chwave(self, id): | |
if id == TRI: self._wave = ToneGenerator._WAVES[1] | |
elif id == SAW: self._wave = ToneGenerator._WAVES[2] | |
elif id == SQUARE: self._wave = ToneGenerator._WAVES[3] | |
elif id == NOISE: self._wave = ToneGenerator._WAVES[4] | |
else: self._wave = ToneGenerator._WAVES[0] | |
def chtone(self, id): | |
if 0 <= id < len(ToneGenerator._STEPS): | |
self._step = ToneGenerator._STEPS[id] | |
def chfreq(self, freq): | |
self._step = TABLE_LENGTH * float(freq) / SAMPLERATE | |
def next(self): | |
stream = array.array('f', [0] * STREAM_LENGTH) | |
_wave, _index, _step = self._wave, self._index, self._step | |
for i in xrange(STREAM_LENGTH): | |
stream[i] = _wave[int(_index)] | |
_index += _step | |
if _index >= TABLE_LENGTH: | |
_index -= TABLE_LENGTH | |
else: self._index = _index | |
return stream | |
def main(): | |
HTML = """ | |
<script type="text/javascript"> | |
window.onload = function() { | |
var ws = new WebSocket("ws://" + location.host + "/ws/echo"); | |
window.addEventListener('devicemotion', function(evt) { | |
var ac = evt.acceleration; | |
var acg = evt.accelerationIncludingGravity; | |
ws.send(ac.x + "," + acg.x + "," + ac.y + "," + acg.y + "," + ac.z + "," + acg.z); | |
}); | |
}; | |
</script> | |
""" | |
soundsystem = SoundSystem() | |
def handle(environ, start_response): | |
path = environ["PATH_INFO"] | |
if path != '/ws/echo': | |
start_response("200 OK", [ ("Content-Type", "text/html"), | |
("Content-Length", str(len(HTML))) ]) | |
return iter([HTML]) | |
else: | |
base = random.choice([7000, 6000, 5000, 4000]) | |
width = random.choice([32,48,64,128]) | |
t = ToneGenerator() | |
t.chwave(random.choice([SAW, TRI])) | |
t.chpan(random.uniform(-0.8, 0.8)) | |
soundsystem.add(t) | |
print "connect" | |
ws = environ["wsgi.websocket"] | |
while True: | |
msg = ws.wait() | |
if msg is None: break | |
params = [ float(x) for x in msg.split(",") ] | |
t.chtone(int(params[1] * width) + base) | |
print params[0] | |
soundsystem.remove(t) | |
print "disconnect" | |
soundsystem.start() | |
socketserver = pywsgi.WSGIServer(("", 3000), handle, | |
handler_class=WebSocketHandler) | |
try: | |
socketserver.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment