|
import json |
|
import queue |
|
import threading |
|
from tornado import websocket, web, ioloop, httpserver |
|
from google.cloud import speech |
|
from google.cloud import logging |
|
|
|
logger = logging.Client().logger("speech-poc") |
|
|
|
class IndexHandler(web.RequestHandler): |
|
def get(self): |
|
self.render("index.html") |
|
|
|
|
|
class AudioWebSocket(websocket.WebSocketHandler): |
|
def check_origin(self, origin): |
|
return True |
|
|
|
def open(self): |
|
self.transcoder = None |
|
print("WebSocket opened") |
|
|
|
def on_message(self, message): |
|
if self.transcoder is None: |
|
config = json.loads(message) |
|
print(config) |
|
|
|
self.transcoder = Transcoder( |
|
sample_rate=config["sampleRateHz"], |
|
language_code=config["language"]) |
|
|
|
# Start the transcoding of audio to text |
|
self.transcoder.start() |
|
else: |
|
self.transcoder.write(message) |
|
interim_results = self.transcoder.interim_results() |
|
if len(interim_results) != 0: |
|
print(interim_results) |
|
logger.log_struct({ |
|
"request": "/translate", |
|
"interim_results": interim_results}) |
|
self.write_message(json.dumps(interim_results)) |
|
|
|
def on_close(self): |
|
self.transcoder.stop() |
|
self.transcoder = None |
|
|
|
|
|
class AudioStream(object): |
|
"""An iteratable object which holds audio data that is pending processing.""" |
|
|
|
def __init__(self): |
|
self.buff = queue.Queue() |
|
self.closed = False |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
def __next__(self): |
|
return self.next() |
|
|
|
def write(self, data): |
|
self.buff.put(data) |
|
|
|
def close(self): |
|
self.closed = True |
|
self.buff.clear() |
|
|
|
def next(self): |
|
while not self.closed: |
|
chunk = self.buff.get() |
|
if chunk is not None: |
|
return chunk |
|
raise StopIteration |
|
|
|
|
|
class Transcoder(object): |
|
"""Coordinates the translation of a raw audio stream.""" |
|
|
|
def __init__(self, sample_rate, language_code, |
|
encoding=speech.enums.RecognitionConfig.AudioEncoding.LINEAR16): |
|
self.sample_rate = sample_rate |
|
self.language_code = language_code |
|
self.encoding = encoding |
|
self.closed = True |
|
self.audio = AudioStream() |
|
self.result_queue = queue.Queue() |
|
|
|
|
|
def write(self, data): |
|
"""Write a chunk of audio to be translated.""" |
|
self.audio.write(data) |
|
|
|
def start(self): |
|
"""Start transcoding audio.""" |
|
self.closed = False |
|
thread = threading.Thread(target=self._process) |
|
thread.start() |
|
|
|
def stop(self): |
|
"""Stop transcoding audio.""" |
|
self.closed = True |
|
self.audio.close() |
|
|
|
def _process(self): |
|
"""Handles the setup of translation request, and retreving audio chunks in queue.""" |
|
client = speech.SpeechClient() |
|
|
|
config = speech.types.RecognitionConfig( |
|
encoding=self.encoding, |
|
sample_rate_hertz=self.sample_rate, |
|
language_code=self.language_code) |
|
streaming_config = speech.types.StreamingRecognitionConfig( |
|
config=config, |
|
interim_results=True) |
|
|
|
# Give it a config and a generator which procduces audio chunks. in return |
|
# you get an iterator of results |
|
responses = client.streaming_recognize(streaming_config, self.generator()) |
|
|
|
# This will block until there's no more interim translation results left |
|
for response in responses: |
|
self.result_queue.put(self._response_to_dict(response)) |
|
|
|
def _response_to_dict(self, response): |
|
"""Converts a response from streaming api to python dict.""" |
|
if response is None: |
|
return [] |
|
|
|
output = [] |
|
for result in response.results: |
|
r = {} |
|
r["stability"] = result.stability |
|
r["is_final"] = result.is_final |
|
|
|
r["alternatives"] = [] |
|
for alt in result.alternatives: |
|
r["alternatives"].append({ |
|
"transcript": alt.transcript, |
|
"confidence": alt.confidence, |
|
}) |
|
output.append(r) |
|
return output |
|
|
|
def interim_results(self, max_results=10): |
|
"""Grabs interm results from the queue.""" |
|
results = [] |
|
while len(results) < max_results and not self.result_queue.empty(): |
|
try: |
|
result = self.result_queue.get(block=False) |
|
except queue.QueueEmpty: |
|
return results |
|
results.append(result) |
|
return results |
|
|
|
def generator(self): |
|
"""Generator that yields audio chunks.""" |
|
for chunk in self.audio: |
|
yield speech.types.StreamingRecognizeRequest(audio_content=chunk) |
|
|
|
|
|
app = web.Application([ |
|
(r"/", IndexHandler), |
|
(r"/index.html", IndexHandler), |
|
(r"/translate", AudioWebSocket), |
|
]) |
|
|
|
if __name__ == "__main__": |
|
ssl_options = { |
|
"certfile": "tls/server.cert", |
|
"keyfile": "tls/server.key", |
|
} |
|
server = httpserver.HTTPServer( |
|
app, xheaders=True, ssl_options=ssl_options) |
|
server.listen(443) |
|
ioloop.IOLoop.current().start() |