Skip to content

Instantly share code, notes, and snippets.

@kstohr
Last active June 15, 2017 07:45
Show Gist options
  • Save kstohr/a1dee0e88e8587576e03804194c7f7df to your computer and use it in GitHub Desktop.
Save kstohr/a1dee0e88e8587576e03804194c7f7df to your computer and use it in GitHub Desktop.
Watson Speech-to-Text Websocket Connection for Python 3.5 (function call)
# coding: utf-8
"""
websocket connection to watson speech-to-text API
adapted from: https://github.com/watson-developer-cloud/speech-to-text-websockets-python
speech recognition using the WebSocket interface to the Watson Speech-to-Text service
This is an updated version of the sample websocket connection created by the IBM Watson team (@daniel-bolanos) and included in their documentation.
Main changes:
- adapted for Python 3 (tested on Python 3.5)
- updated to include latest features (i.e. 'speaker_labels')
- adapted from CLI to function call for use in scripts rather than command line
- option to log responses to log file rather than to console (large files can cause IDE's freeze)
To run:
# import watson_stt_ws
# watson_ws_connect (file_input, dir_output, model, threads, log_type, params)
Known issues:
If you run this from the interpreter and an exception is raised from a function within WSInterfaceProtocol
or the WSInterfaceFactory class the websocket connection may fail without properly shutting down active
threads. In which case, any error or exception messages may not be raised causing the connection to fail silently or hang.
In addition you may have to force the running threads to quit (try running sys.exit() from the interpreter)
before retrying the connection.
Ideas on how to 'listen' for errors, exceptions and close down the threads more gracefully welcomed...
"""
import json # json
import threading # multi threading
import os # for listing directories
import queue as Queue # queue used for thread syncronization
import sys # system calls
import base64 # necessary to encode in base64 according to the RFC2045 standard
import certifi # validate secure ssl connection
import requests # python HTTP requests library
from os.path import join, dirname # filepath manipulation
import datetime as dt # datetime objects
# WebSockets
from autobahn.twisted.websocket import WebSocketClientProtocol, WebSocketClientFactory, connectWS
from twisted.python import log
from twisted.internet import ssl, reactor
# App settings
from error_classes import * #error classes
from dotenv import load_dotenv # import credentials from .env
# Watson credentials (from .env file) // alt set manually
dotenv_path = join(dirname("__file__"), '.env')
load_dotenv(dotenv_path)
username=os.environ['WATSON_USERNAME']
password=os.environ['WATSON_PASSWORD']
# Connection settings
hostname="stream.watsonplatform.net" #set base connection url
serviceName='speech-to-text' #set watson service
model = 'en-US_BroadbandModel' #set watson model
threads = 4 #set number of threads
log_file = True #setting to file will save a log file to output directory, set to 'False' to log to console
search_keywords = [] # list of keywords
codec = '.wav' # or 'flac' etc.
params = {"action": "start", #required
"content-type": 'audio/' + str(codec), #required
"interim_results": False,
"inactivity_timeout": -1, #set to -1 to ignore silence
'word_alternatives_threshold': .75,
'word_confidence': True,
'timestamps': True,
'max_alternatives': 1,
'speaker_labels': True,
'keywords': search_keywords,
'keywords_threshold': .6,
'profanity_filter': False,
'smart_formatting': True,
#'x-watson-learning-opt-out': False # Per API reference, but yields a warning in transcript.
}
class Utils:
@staticmethod
def getAuthenticationToken(hostname, serviceName, username, password):
try:
uri = hostname + "/authorization/api/v1/token?url=" + hostname + '/' + serviceName + "/api"
uri = uri.replace("wss://", "https://")
uri = uri.replace("ws://", "https://")
log.msg(uri)
resp = requests.get(uri, auth=(username, password), verify=certifi.where(),
headers={'Accept': 'application/json'},
timeout=(30, 30))
log.msg(resp.text)
jsonObject = resp.json()
return jsonObject['token']
except:
raise Exception('getAuthenticationToken failed. Check Watson credentials: {0},{1}'.format(username, password))
class WSInterfaceFactory(WebSocketClientFactory):
def __init__(self, queue, dirOutput, contentType, model, params,
url=None, headers=None, debug=None):
WebSocketClientFactory.__init__(self, url=url, headers=headers)
self.queue = queue
self.dirOutput = dirOutput
self.contentType = contentType
self.model = model
self.params = params
self.queueProto = Queue.Queue()
self.openHandshakeTimeout = 10
self.closeHandshakeTimeout = 10
# start the thread that takes care of ending the reactor so
# the script can finish automatically (without ctrl+c)
endingThread = threading.Thread(target=self.endReactor, args=())
endingThread.daemon = True #ALERT: causes errors in handling functions not to be raised
endingThread.start()
def prepareUtterance(self):
try:
utt = self.queue.get_nowait()
self.queueProto.put(utt)
return True
except Queue.Empty:
log.msg ("getUtterance: no more utterances to process, queue is empty!")
return False
def endReactor(self):
self.queue.join()
log.msg("about to stop the reactor!")
reactor.stop()
return
# this function gets called every time connectWS is called (once
# per WebSocket connection/session)
def buildProtocol(self, addr):
try:
utt = self.queueProto.get_nowait()
proto = WSInterfaceProtocol(self, self.queue, self.params,
self.dirOutput, self.contentType)
proto.setUtterance(utt)
return proto
except Queue.Empty:
log.msg("queue should not be empty, otherwise this function should not have been called")
raise Exception("Watson file queue should not be empty.")
return None
# WebSockets interface to the STT service
# Note: an object of this class is created for each WebSocket
# connection, every time we call connectWS
class WSInterfaceProtocol(WebSocketClientProtocol):
def __init__(self, factory, queue, params, dirOutput, contentType):
self.factory = factory
self.queue = queue
self.params = params
self.dirOutput = dirOutput
self.contentType = contentType
self.packetRate = 20
self.listeningMessages = 0
self.timeFirstInterim = -1
self.bytesSent = 0
self.chunkSize = 2000 # in bytes
super(self.__class__, self).__init__()
print ("queueSize: " + str(self.queue.qsize()))
log.msg ("contentType: " + str(self.contentType) + "\n"
"queueSize: " + str(self.queue.qsize()) + "\n"
"Transcipts saved to: " + dirOutput
)
def setUtterance(self, utt):
self.uttNumber = utt[0]
self.uttFilename = utt[1]
split_path = os.path.split(self.uttFilename)[1]
filename = os.path.splitext(split_path)[0]
self.fileJson = self.dirOutput + "/" + str(filename) +
".json"
try:
os.remove(self.fileJson)
except OSError:
pass
# Helper method that sends a chunk of audio if needed (as required
# what the specified pacing is)
def maybeSendChunk(self, data):
def sendChunk(chunk, final=False):
self.bytesSent += len(chunk)
self.sendMessage(chunk, isBinary=True)
if final:
self.sendMessage(b'', isBinary=True)
if (self.bytesSent + self.chunkSize >= len(data)):
if (len(data) > self.bytesSent):
sendChunk(data[self.bytesSent:len(data)], True)
return
sendChunk(data[self.bytesSent:self.bytesSent + self.chunkSize])
self.factory.reactor.callLater(0.01, self.maybeSendChunk, data=data)
return
def onConnect(self, response):
log.msg ("onConnect, server connected: {0}".format(response.peer))
def onOpen(self):
log.msg ("onOpen")
data = params
log.msg ("sendMessage(init)")
# send the initialization parameters
self.sendMessage(json.dumps(data).encode('utf8'))
# start sending audio right away (it will get buffered in the
# STT service)
print ("Sending file: " + str(self.uttNumber) + ": " + str(self.uttFilename))
log.msg ("Sending file: " + str(self.uttNumber) + ": " + str(self.uttFilename))
f = open(str(self.uttFilename), 'rb')
self.bytesSent = 0
dataFile = f.read()
self.maybeSendChunk(dataFile)
log.msg ("File sent: " + str(self.uttNumber) + ": " + str(self.uttFilename))
log.msg ("onOpen ends.")
def onMessage(self, payload, isBinary):
if isBinary:
print("Binary message received: {0} bytes".format(len(payload)))
else:
#log.msg(u"Text message received: {0}".format(payload.decode('utf8')))
# if uninitialized, receive the initialization response
# from the server
jsonObject = json.loads(payload.decode('utf8'))
if 'state' in jsonObject:
self.listeningMessages += 1
if (self.listeningMessages == 2):
log.msg ("sending close 1000")
# close the connection
self.sendClose(1000)
# if in streaming
elif 'results' in jsonObject:
jsonObject = json.loads(payload.decode('utf8'))
#hypothesis = ""
# empty hypothesis
if (len(jsonObject['results']) == 0):
raise Exception("Watson returned empty results!")
# regular hypothesis
else:
# dump the message to the output directory
jsonObject = json.loads(payload.decode('utf8'))
f = open(self.fileJson, "a")
f.write(json.dumps(jsonObject, indent=4, sort_keys=True))
f.close()
def onClose(self, wasClean, code, reason):
log.msg("onClose")
if not code == 1000:
print("WebSocket connection error: {0}".format(reason), "code: ",
code, "clean: ", wasClean, "reason: ", reason)
raise Exception("WebSocket connection error: {0}".format(reason), "code: ",
code, "clean: ", wasClean, "reason: ", reason)
return
else:
log.msg ("WebSocket connection closed: {0}".format(reason), "code: ",
code, "clean: ", wasClean, "reason: ", reason)
print ("WebSocket connection closed. Status code: {0}".format(code))
# create a new WebSocket connection if there are still
# utterances in the queue that need to be processed
self.queue.task_done()
if self.factory.prepareUtterance() == False:
return
# SSL client context: default
if self.factory.isSecure:
contextFactory = ssl.ClientContextFactory()
else:
contextFactory = None
connectWS(self.factory, contextFactory)
#this is the main function call
#call this function, or import it into another script, to run the websocket connection
def watson_ws_connect(file_input, dir_output, model, threads, log_type, params):
fileInput = file_input
dirOutput = dir_output
model = model
contentType = params['content-type']
threads = threads
logType = log_type
now = dt.datetime.now()
# create output directory if necessary
if not (os.path.isdir(dirOutput)):
os.makedirs(dirOutput)
print ("Transcipts to be saved to: " + dirOutput)
# logging from Twisted module
if log_file == True:
log.startLogging(open(dirOutput + str(now) + "_" + "transcript.log", 'w'), setStdout=False)
else:
log.startLogging(sys.stdout)
# add audio files to the processing queue
q = Queue.Queue()
fileNumber = 0
for fileName in fileInput:
log.msg (fileName)
q.put((fileNumber, fileName))
fileNumber += 1
headers = {}
# authentication header
headers['X-Watson-Authorization-Token'] = (
Utils.getAuthenticationToken("https://" + hostname, serviceName, username, password)
)
# create a WS server factory with our protocol
url = "wss://" + hostname + "/speech-to-text/api/v1/recognize?model=" + model
factory = WSInterfaceFactory(q, dirOutput, contentType,
model, params, url, headers, debug=False)
factory.protocol = WSInterfaceProtocol
for i in range(min(int(threads), q.qsize())):
factory.prepareUtterance()
# SSL client context: default
if factory.isSecure:
contextFactory = ssl.ClientContextFactory()
else:
contextFactory = None
connectWS(factory, contextFactory)
reactor.run()
@kstohr
Copy link
Author

kstohr commented Jun 15, 2017

If you are looking for something that will run from the command line for Python 3 (i.e. an updated version of the original python sample code)...
try this: speech-to-text-websockets-for-python-3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment