Created
August 3, 2017 01:31
-
-
Save cobookman/6459f0423d56527ad136999e57d181ea to your computer and use it in GitHub Desktop.
Speech Streaming with Python & GRPC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html> | |
<head> | |
</head> | |
<body> | |
<h2>Transcript</h2> | |
<div id="transcript"><?div> | |
<script> | |
var app = { | |
socket: null, | |
mediaTrack: null, | |
counter: 0, | |
bufferSize: 4096, | |
main: function() { | |
this.socket = new WebSocket("ws://35.188.93.150:80"); | |
this.socket.addEventListener("open", this.onSocketOpen.bind(this)); | |
this.socket.addEventListener("message", this.onSocketMessage.bind(this)); | |
}, | |
onSocketOpen: function(event) { | |
this.initRecorder(); | |
console.log("onSocketOpen", event); | |
}, | |
onSocketMessage: function(event) { | |
console.log("Message", event.data); | |
document.getElementById("transcript").innerHTML += "<p>" + event.data + "</p>" | |
}, | |
shimAudioContext: function() { | |
try { | |
// Shims | |
window.AudioContext = window.AudioContext || window.webkitAudioContext; | |
navigator.getUserMedia = navigator.getUserMedia || | |
navigator.webkitGetUserMedia || | |
navigator.mozGetUserMedia || | |
navigator.msGetUserMedia; | |
} catch (e) { | |
alert("Your browser is not supported"); | |
return false; | |
} | |
if(!navigator.getUserMedia || !window.AudioContext) { | |
alert("Your browser is not supported"); | |
return false; | |
} | |
return true; | |
}, | |
initRecorder: function() { | |
// shim audio context | |
if (!this.shimAudioContext()) { | |
return; | |
} | |
return navigator.mediaDevices.getUserMedia({"audio": true, "video": false}).then((stream) => { | |
var context = new window.AudioContext(); | |
// send metadata on audio stream to backend | |
this.socket.send(JSON.stringify({ | |
rate: context.sampleRate, | |
language: "en-US", | |
format: "LINEAR16" | |
})); | |
// Caputure mic audio data into a stream | |
var audioInput = context.createMediaStreamSource(stream); | |
// only record mono audio w/a buffer of 2048 bits per function call | |
var recorder = context.createScriptProcessor(this.bufferSize, 1, 1); | |
// specify the processing function | |
recorder.onaudioprocess = this.audioProcess.bind(this); | |
// connect stream to our recorder | |
audioInput.connect(recorder); | |
// connect recorder to previous destination | |
recorder.connect(context.destination); | |
// store media track | |
this.mediaTrack = stream.getTracks()[0]; | |
}); | |
}, | |
float32To16BitPCM: function(float32Arr) { | |
var pcm16bit = new Int16Array(float32Arr.length); | |
for(var i = 0; i < float32Arr.length; ++i) { | |
// force number in [-1,1] | |
var s = Math.max(-1, Math.min(1, float32Arr[i])); | |
/** | |
* convert 32 bit float to 16 bit int pcm audio | |
* 0x8000 = minimum int16 value, 0x7fff = maximum int16 value | |
*/ | |
pcm16bit[i] = s < 0 ? s * 0x8000 : s * 0x7FFF; | |
} | |
return pcm16bit; | |
}, | |
audioProcess: function(event) { | |
// only 1 channel as specified above..... | |
var float32Audio = event.inputBuffer.getChannelData(0) || new Flaot32Array(this.bufferSize); | |
var pcm16Audio = this.float32To16BitPCM(float32Audio); | |
this.socket.send(pcm16Audio.buffer); | |
} | |
}; | |
app.main(); | |
</script> | |
</body> | |
</html> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import asyncio | |
import websockets | |
import json | |
import io | |
import threading | |
import queue | |
from google.cloud import speech | |
from google.cloud.gapic.speech.v1 import speech_client | |
from google.cloud.proto.speech.v1 import cloud_speech_pb2 | |
class StreamingRequest(object): | |
"""A Streaming Request iterable for speech api.""" | |
def __init__(self, audio_stream, config): | |
"""Initializes the streaming request obj. | |
params: | |
audio_stream: An AudioStream obj | |
config: The protobuf configuration for api call | |
""" | |
self.audio_stream = audio_stream | |
self.config = config | |
self.is_first = True | |
def __iter__(self): | |
return self | |
def __next__(self): | |
return self.next() | |
def next(self): | |
"""Generate the next gRPC streaming api request.""" | |
if self.audio_stream.closed: | |
return None | |
if self.is_first: | |
self.is_first = False | |
return cloud_speech_pb2.StreamingRecognizeRequest( | |
streaming_config=self.config) | |
# block until read some data or until stream closed | |
data = self.audio_stream.read() | |
while not self.audio_stream.closed and len(data) == 0: | |
data = self.audio_stream.read() | |
return cloud_speech_pb2.StreamingRecognizeRequest( | |
audio_content=data) | |
def results_to_dict(results): | |
if results is None: | |
return [] | |
output = [] | |
for result in results.results: | |
r = {} | |
r["stability"] = result.stability | |
r["is_final"] = False | |
if result.is_final: | |
r["is_final"] = True | |
r["alternatives"] = [] | |
for alternative in result.alternatives: | |
r["alternatives"].append({ | |
"transcript": alternative.transcript, | |
"confidence": alternative.confidence, | |
}) | |
output.append(r) | |
return output | |
class AudioStream(io.BytesIO): | |
"""Read dumps latest unread written data.""" | |
def read(self, n=None): | |
"""Reads up to `n` bytes.""" | |
if not hasattr(self, "_position"): | |
self._position = 0 | |
self.seek(self._position) | |
data = super(AudioStream, self).read(n) | |
self._position += len(data) | |
return data | |
class Transcoder(object): | |
"""Streaming Transcodes chunks of audio to text.""" | |
def __init__(self, encoding, rate, language): | |
self.encoding = encoding | |
self.rate = rate | |
self.language = language | |
self.audio = AudioStream() | |
self.results = queue.Queue() | |
def start(self): | |
"""Start up streaming speech call.""" | |
threading.Thread(target=self._process).start() | |
def write(self, data): | |
"""Send chunk of audio to speech api.""" | |
self.audio.write(data) | |
def get_result(self): | |
"""Gets a result from the streaming api.""" | |
try: | |
return self.results.get(False) | |
except: | |
return None | |
def _process(self): | |
"""sets up a streaming speech api request. And streams results into a queue.""" | |
self.client = speech_client.SpeechClient() | |
self.config = cloud_speech_pb2.StreamingRecognitionConfig( | |
config=cloud_speech_pb2.RecognitionConfig( | |
encoding=self.encoding, | |
sample_rate_hertz=self.rate, | |
language_code=self.language), | |
interim_results=True) | |
requests = StreamingRequest(self.audio, self.config) | |
streaming_resp = self.client.streaming_recognize(iter(requests)) | |
# This will block until self.audio is closed...which closes the streaming_recognize req | |
for resp in streaming_resp: | |
self.results.put(resp) | |
@asyncio.coroutine | |
def audioin(websocket, path): | |
# First message should be config | |
config = yield from websocket.recv() | |
if not isinstance(config, str): | |
print("ERROR, no config") | |
yield from websocket.send( | |
json.dumps({"error": "configuration not received as first message"})); | |
return; | |
config = json.loads(config) | |
transcoder = Transcoder( | |
encoding=config["format"], | |
rate=config["rate"], | |
language=config["language"], | |
) | |
# Start the transcoding | |
transcoder.start() | |
# Process incoming audio packets | |
while True: | |
data = yield from websocket.recv() | |
transcoder.write(data) | |
# Check for messages | |
result = transcoder.get_result() | |
result_dict= results_to_dict(result) | |
result_json = json.dumps(result_dict) | |
print(result_dict) | |
yield from websocket.send(result_json) | |
start_server = websockets.serve(audioin, "0.0.0.0", 80) | |
asyncio.get_event_loop().run_until_complete(start_server) | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Does this actually work? Is performance good on the Google side?