Last active
June 15, 2017 07:45
-
-
Save kstohr/a1dee0e88e8587576e03804194c7f7df to your computer and use it in GitHub Desktop.
Watson Speech-to-Text Websocket Connection for Python 3.5 (function call)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
websocket connection to watson speech-to-text API | |
adapted from: https://github.com/watson-developer-cloud/speech-to-text-websockets-python | |
speech recognition using the WebSocket interface to the Watson Speech-to-Text service | |
This is an updated version of the sample websocket connection created by the IBM Watson team (@daniel-bolanos) and included in their documentation. | |
Main changes: | |
- adapted for Python 3 (tested on Python 3.5) | |
- updated to include latest features (i.e. 'speaker_labels') | |
- adapted from CLI to function call for use in scripts rather than command line | |
- option to log responses to log file rather than to console (large files can cause IDE's freeze) | |
To run: | |
# import watson_stt_ws | |
# watson_ws_connect (file_input, dir_output, model, threads, log_type, params) | |
Known issues: | |
If you run this from the interpreter and an exception is raised from a function within WSInterfaceProtocol | |
or the WSInterfaceFactory class the websocket connection may fail without properly shutting down active | |
threads. In which case, any error or exception messages may not be raised causing the connection to fail silently or hang. | |
In addition you may have to force the running threads to quit (try running sys.exit() from the interpreter) | |
before retrying the connection. | |
Ideas on how to 'listen' for errors, exceptions and close down the threads more gracefully welcomed... | |
""" | |
import json # json | |
import threading # multi threading | |
import os # for listing directories | |
import queue as Queue # queue used for thread syncronization | |
import sys # system calls | |
import base64 # necessary to encode in base64 according to the RFC2045 standard | |
import certifi # validate secure ssl connection | |
import requests # python HTTP requests library | |
from os.path import join, dirname # filepath manipulation | |
import datetime as dt # datetime objects | |
# WebSockets | |
from autobahn.twisted.websocket import WebSocketClientProtocol, WebSocketClientFactory, connectWS | |
from twisted.python import log | |
from twisted.internet import ssl, reactor | |
# App settings | |
from error_classes import * #error classes | |
from dotenv import load_dotenv # import credentials from .env | |
# Watson credentials (from .env file) // alt set manually | |
dotenv_path = join(dirname("__file__"), '.env') | |
load_dotenv(dotenv_path) | |
username=os.environ['WATSON_USERNAME'] | |
password=os.environ['WATSON_PASSWORD'] | |
# Connection settings | |
hostname="stream.watsonplatform.net" #set base connection url | |
serviceName='speech-to-text' #set watson service | |
model = 'en-US_BroadbandModel' #set watson model | |
threads = 4 #set number of threads | |
log_file = True #setting to file will save a log file to output directory, set to 'False' to log to console | |
search_keywords = [] # list of keywords | |
codec = '.wav' # or 'flac' etc. | |
params = {"action": "start", #required | |
"content-type": 'audio/' + str(codec), #required | |
"interim_results": False, | |
"inactivity_timeout": -1, #set to -1 to ignore silence | |
'word_alternatives_threshold': .75, | |
'word_confidence': True, | |
'timestamps': True, | |
'max_alternatives': 1, | |
'speaker_labels': True, | |
'keywords': search_keywords, | |
'keywords_threshold': .6, | |
'profanity_filter': False, | |
'smart_formatting': True, | |
#'x-watson-learning-opt-out': False # Per API reference, but yields a warning in transcript. | |
} | |
class Utils: | |
@staticmethod | |
def getAuthenticationToken(hostname, serviceName, username, password): | |
try: | |
uri = hostname + "/authorization/api/v1/token?url=" + hostname + '/' + serviceName + "/api" | |
uri = uri.replace("wss://", "https://") | |
uri = uri.replace("ws://", "https://") | |
log.msg(uri) | |
resp = requests.get(uri, auth=(username, password), verify=certifi.where(), | |
headers={'Accept': 'application/json'}, | |
timeout=(30, 30)) | |
log.msg(resp.text) | |
jsonObject = resp.json() | |
return jsonObject['token'] | |
except: | |
raise Exception('getAuthenticationToken failed. Check Watson credentials: {0},{1}'.format(username, password)) | |
class WSInterfaceFactory(WebSocketClientFactory): | |
def __init__(self, queue, dirOutput, contentType, model, params, | |
url=None, headers=None, debug=None): | |
WebSocketClientFactory.__init__(self, url=url, headers=headers) | |
self.queue = queue | |
self.dirOutput = dirOutput | |
self.contentType = contentType | |
self.model = model | |
self.params = params | |
self.queueProto = Queue.Queue() | |
self.openHandshakeTimeout = 10 | |
self.closeHandshakeTimeout = 10 | |
# start the thread that takes care of ending the reactor so | |
# the script can finish automatically (without ctrl+c) | |
endingThread = threading.Thread(target=self.endReactor, args=()) | |
endingThread.daemon = True #ALERT: causes errors in handling functions not to be raised | |
endingThread.start() | |
def prepareUtterance(self): | |
try: | |
utt = self.queue.get_nowait() | |
self.queueProto.put(utt) | |
return True | |
except Queue.Empty: | |
log.msg ("getUtterance: no more utterances to process, queue is empty!") | |
return False | |
def endReactor(self): | |
self.queue.join() | |
log.msg("about to stop the reactor!") | |
reactor.stop() | |
return | |
# this function gets called every time connectWS is called (once | |
# per WebSocket connection/session) | |
def buildProtocol(self, addr): | |
try: | |
utt = self.queueProto.get_nowait() | |
proto = WSInterfaceProtocol(self, self.queue, self.params, | |
self.dirOutput, self.contentType) | |
proto.setUtterance(utt) | |
return proto | |
except Queue.Empty: | |
log.msg("queue should not be empty, otherwise this function should not have been called") | |
raise Exception("Watson file queue should not be empty.") | |
return None | |
# WebSockets interface to the STT service | |
# Note: an object of this class is created for each WebSocket | |
# connection, every time we call connectWS | |
class WSInterfaceProtocol(WebSocketClientProtocol): | |
def __init__(self, factory, queue, params, dirOutput, contentType): | |
self.factory = factory | |
self.queue = queue | |
self.params = params | |
self.dirOutput = dirOutput | |
self.contentType = contentType | |
self.packetRate = 20 | |
self.listeningMessages = 0 | |
self.timeFirstInterim = -1 | |
self.bytesSent = 0 | |
self.chunkSize = 2000 # in bytes | |
super(self.__class__, self).__init__() | |
print ("queueSize: " + str(self.queue.qsize())) | |
log.msg ("contentType: " + str(self.contentType) + "\n" | |
"queueSize: " + str(self.queue.qsize()) + "\n" | |
"Transcipts saved to: " + dirOutput | |
) | |
def setUtterance(self, utt): | |
self.uttNumber = utt[0] | |
self.uttFilename = utt[1] | |
split_path = os.path.split(self.uttFilename)[1] | |
filename = os.path.splitext(split_path)[0] | |
self.fileJson = self.dirOutput + "/" + str(filename) + | |
".json" | |
try: | |
os.remove(self.fileJson) | |
except OSError: | |
pass | |
# Helper method that sends a chunk of audio if needed (as required | |
# what the specified pacing is) | |
def maybeSendChunk(self, data): | |
def sendChunk(chunk, final=False): | |
self.bytesSent += len(chunk) | |
self.sendMessage(chunk, isBinary=True) | |
if final: | |
self.sendMessage(b'', isBinary=True) | |
if (self.bytesSent + self.chunkSize >= len(data)): | |
if (len(data) > self.bytesSent): | |
sendChunk(data[self.bytesSent:len(data)], True) | |
return | |
sendChunk(data[self.bytesSent:self.bytesSent + self.chunkSize]) | |
self.factory.reactor.callLater(0.01, self.maybeSendChunk, data=data) | |
return | |
def onConnect(self, response): | |
log.msg ("onConnect, server connected: {0}".format(response.peer)) | |
def onOpen(self): | |
log.msg ("onOpen") | |
data = params | |
log.msg ("sendMessage(init)") | |
# send the initialization parameters | |
self.sendMessage(json.dumps(data).encode('utf8')) | |
# start sending audio right away (it will get buffered in the | |
# STT service) | |
print ("Sending file: " + str(self.uttNumber) + ": " + str(self.uttFilename)) | |
log.msg ("Sending file: " + str(self.uttNumber) + ": " + str(self.uttFilename)) | |
f = open(str(self.uttFilename), 'rb') | |
self.bytesSent = 0 | |
dataFile = f.read() | |
self.maybeSendChunk(dataFile) | |
log.msg ("File sent: " + str(self.uttNumber) + ": " + str(self.uttFilename)) | |
log.msg ("onOpen ends.") | |
def onMessage(self, payload, isBinary): | |
if isBinary: | |
print("Binary message received: {0} bytes".format(len(payload))) | |
else: | |
#log.msg(u"Text message received: {0}".format(payload.decode('utf8'))) | |
# if uninitialized, receive the initialization response | |
# from the server | |
jsonObject = json.loads(payload.decode('utf8')) | |
if 'state' in jsonObject: | |
self.listeningMessages += 1 | |
if (self.listeningMessages == 2): | |
log.msg ("sending close 1000") | |
# close the connection | |
self.sendClose(1000) | |
# if in streaming | |
elif 'results' in jsonObject: | |
jsonObject = json.loads(payload.decode('utf8')) | |
#hypothesis = "" | |
# empty hypothesis | |
if (len(jsonObject['results']) == 0): | |
raise Exception("Watson returned empty results!") | |
# regular hypothesis | |
else: | |
# dump the message to the output directory | |
jsonObject = json.loads(payload.decode('utf8')) | |
f = open(self.fileJson, "a") | |
f.write(json.dumps(jsonObject, indent=4, sort_keys=True)) | |
f.close() | |
def onClose(self, wasClean, code, reason): | |
log.msg("onClose") | |
if not code == 1000: | |
print("WebSocket connection error: {0}".format(reason), "code: ", | |
code, "clean: ", wasClean, "reason: ", reason) | |
raise Exception("WebSocket connection error: {0}".format(reason), "code: ", | |
code, "clean: ", wasClean, "reason: ", reason) | |
return | |
else: | |
log.msg ("WebSocket connection closed: {0}".format(reason), "code: ", | |
code, "clean: ", wasClean, "reason: ", reason) | |
print ("WebSocket connection closed. Status code: {0}".format(code)) | |
# create a new WebSocket connection if there are still | |
# utterances in the queue that need to be processed | |
self.queue.task_done() | |
if self.factory.prepareUtterance() == False: | |
return | |
# SSL client context: default | |
if self.factory.isSecure: | |
contextFactory = ssl.ClientContextFactory() | |
else: | |
contextFactory = None | |
connectWS(self.factory, contextFactory) | |
#this is the main function call | |
#call this function, or import it into another script, to run the websocket connection | |
def watson_ws_connect(file_input, dir_output, model, threads, log_type, params): | |
fileInput = file_input | |
dirOutput = dir_output | |
model = model | |
contentType = params['content-type'] | |
threads = threads | |
logType = log_type | |
now = dt.datetime.now() | |
# create output directory if necessary | |
if not (os.path.isdir(dirOutput)): | |
os.makedirs(dirOutput) | |
print ("Transcipts to be saved to: " + dirOutput) | |
# logging from Twisted module | |
if log_file == True: | |
log.startLogging(open(dirOutput + str(now) + "_" + "transcript.log", 'w'), setStdout=False) | |
else: | |
log.startLogging(sys.stdout) | |
# add audio files to the processing queue | |
q = Queue.Queue() | |
fileNumber = 0 | |
for fileName in fileInput: | |
log.msg (fileName) | |
q.put((fileNumber, fileName)) | |
fileNumber += 1 | |
headers = {} | |
# authentication header | |
headers['X-Watson-Authorization-Token'] = ( | |
Utils.getAuthenticationToken("https://" + hostname, serviceName, username, password) | |
) | |
# create a WS server factory with our protocol | |
url = "wss://" + hostname + "/speech-to-text/api/v1/recognize?model=" + model | |
factory = WSInterfaceFactory(q, dirOutput, contentType, | |
model, params, url, headers, debug=False) | |
factory.protocol = WSInterfaceProtocol | |
for i in range(min(int(threads), q.qsize())): | |
factory.prepareUtterance() | |
# SSL client context: default | |
if factory.isSecure: | |
contextFactory = ssl.ClientContextFactory() | |
else: | |
contextFactory = None | |
connectWS(factory, contextFactory) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If you are looking for something that will run from the command line for Python 3 (i.e. an updated version of the original python sample code)...
try this: speech-to-text-websockets-for-python-3