Skip to content

Instantly share code, notes, and snippets.

@mohayonao
Created October 28, 2011 13:57
Show Gist options
  • Save mohayonao/1322328 to your computer and use it in GitHub Desktop.
Save mohayonao/1322328 to your computer and use it in GitHub Desktop.
iPhone - WebSocket - Python で音を鳴らす
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math, array, random
import threading
import pyaudio
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
PI2 = math.pi * 2
SAMPLERATE = 22050
TABLE_LENGTH = 1024
STREAM_LENGTH = 32
SINE, TRI, SAW, SQUARE, NOISE = range(5)
class SoundSystem(threading.Thread):
def __init__(self, channel=2):
threading.Thread.__init__(self)
self.setDaemon(True)
self._gens = set()
if channel == 2:
def _2ch():
stream = array.array('f', [0] * STREAM_LENGTH * 2)
for gen in self._gens:
cell = gen.next()
L = 2 * (gen._amp * (1.0 - gen._pan))
R = 2 * (gen._amp * gen._pan)
for i in xrange(STREAM_LENGTH):
stream[i*2 ] += cell[i] * L
stream[i*2+1] += cell[i] * R
return stream
self.__iter__func = _2ch
else:
channel = 1
def _1ch():
stream = array.array('f', [0] * STREAM_LENGTH)
for gen in self._gens:
cell = gen.next()
amp = gen._amp
for i in xrange(STREAM_LENGTH):
stream[i] += cell[i] * amp
return stream
self.__iter__func = _1ch
p = pyaudio.PyAudio()
self.stream = p.open(rate=SAMPLERATE, channels=channel,
format=pyaudio.paFloat32, output=True)
def add(self, gen):
if isinstance(gen, ToneGenerator):
self._gens.add(gen)
def remove(self, gen):
self._gens.discard(gen)
def __iter__(self):
while True: yield self.next()
def next(self):
return self.__iter__func()
def run(self):
for stream in self:
for i in xrange(STREAM_LENGTH):
if stream[i] < -1.0 : stream[i] = -1.0
elif 1.0 < stream[i]: stream[i] = +1.0
self.stream.write(array.array('f', stream).tostring())
class ToneGenerator:
def __cls_waves():
sine = lambda x: math.sin(PI2 * x)
tri = lambda x: 1 - 4 * abs(round(x) - x)
saw = lambda x: 2.0 * (x - round(x))
square = lambda x: +1.0 if x < 0.5 else -1.0
noise = lambda x: random.uniform(-1.0, 1.0)
return tuple([tuple(f(float(i)/TABLE_LENGTH)
for i in xrange(TABLE_LENGTH))
for f in sine, tri, saw, square, noise])
def __cls_steps(length, resolution):
center = length >> 1
def calcStep(i):
freq = 440.0 * ((2.0**(1.0/(12*resolution)))**(i-center))
return TABLE_LENGTH * freq / SAMPLERATE
return tuple(calcStep(i) for i in xrange(length))
_WAVES = __cls_waves()
_STEPS = __cls_steps(8192, 128)
def __init__(self, wave=SINE, tone=None, freq=None, amp=None, pan=None):
self._amp = 0.8
self._pan = 0.5
self._wave = None
self._index = 0
self._step = 0
self.chwave(wave)
if tone is not None: self.chtone(tone)
if freq is not None: self.chfreq(freq)
if amp is not None: self.champ(amp)
if pan is not None: self.chpan(pan)
if wave == NOISE and self._step == 0: self._step = 1
def champ(self, amp):
self._amp = max(0.0, min(1.0, amp))
def chpan(self, amp):
self._pan = max(0.0, min(1.0, amp))
def chwave(self, id):
if id == TRI: self._wave = ToneGenerator._WAVES[1]
elif id == SAW: self._wave = ToneGenerator._WAVES[2]
elif id == SQUARE: self._wave = ToneGenerator._WAVES[3]
elif id == NOISE: self._wave = ToneGenerator._WAVES[4]
else: self._wave = ToneGenerator._WAVES[0]
def chtone(self, id):
if 0 <= id < len(ToneGenerator._STEPS):
self._step = ToneGenerator._STEPS[id]
def chfreq(self, freq):
self._step = TABLE_LENGTH * float(freq) / SAMPLERATE
def next(self):
stream = array.array('f', [0] * STREAM_LENGTH)
_wave, _index, _step = self._wave, self._index, self._step
for i in xrange(STREAM_LENGTH):
stream[i] = _wave[int(_index)]
_index += _step
if _index >= TABLE_LENGTH:
_index -= TABLE_LENGTH
else: self._index = _index
return stream
def main():
HTML = """
<script type="text/javascript">
window.onload = function() {
var ws = new WebSocket("ws://" + location.host + "/ws/echo");
window.addEventListener('devicemotion', function(evt) {
var ac = evt.acceleration;
var acg = evt.accelerationIncludingGravity;
ws.send(ac.x + "," + acg.x + "," + ac.y + "," + acg.y + "," + ac.z + "," + acg.z);
});
};
</script>
"""
soundsystem = SoundSystem()
def handle(environ, start_response):
path = environ["PATH_INFO"]
if path != '/ws/echo':
start_response("200 OK", [ ("Content-Type", "text/html"),
("Content-Length", str(len(HTML))) ])
return iter([HTML])
else:
base = random.choice([7000, 6000, 5000, 4000])
width = random.choice([32,48,64,128])
t = ToneGenerator()
t.chwave(random.choice([SAW, TRI]))
t.chpan(random.uniform(-0.8, 0.8))
soundsystem.add(t)
print "connect"
ws = environ["wsgi.websocket"]
while True:
msg = ws.wait()
if msg is None: break
params = [ float(x) for x in msg.split(",") ]
t.chtone(int(params[1] * width) + base)
print params[0]
soundsystem.remove(t)
print "disconnect"
soundsystem.start()
socketserver = pywsgi.WSGIServer(("", 3000), handle,
handler_class=WebSocketHandler)
try:
socketserver.serve_forever()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment