Skip to content

Instantly share code, notes, and snippets.

@mgoldey
Created May 11, 2018 22:03
Show Gist options
  • Save mgoldey/6ef09d7a768135c16518770c084067dd to your computer and use it in GitHub Desktop.
Save mgoldey/6ef09d7a768135c16518770c084067dd to your computer and use it in GitHub Desktop.
interface with sqcserver
#!/usr/bin/env python3
from __future__ import print_function
import websocket
import argparse
import json
import time
websocket.enableTrace(False)
def load_data(filename):
with open(filename, 'rb') as f:
data = f.read()
return data
def cleanTranscript(transcript):
if transcript == None:
return transcript
while "<" in transcript:
start = transcript.index("<")
end = transcript.index(">")
transcript = transcript[:start] + transcript[end + 1:]
return transcript.strip()
def make_connection(data, address, debug=False):
if debug:
print("Connecting to " + address)
n_tries = 5
for attempt in range(n_tries):
ws = websocket.create_connection(address)
ws.send_binary(data)
ws.send("EOS")
hyps = []
while True:
try:
result = json.loads(ws.recv())
except:
break
if "result" in result:
hyps += [result]
else:
break
ws.close()
if len(hyps) > 0 and "result" in hyps[-1]:
if debug:
print(hyps[-1])
if "interpreted_quote" in hyps[-1]['result']['hypotheses'][0]:
print(hyps[-1]['result']['hypotheses'][0]['interpreted_quote']['imString'])
return cleanTranscript(hyps[-1]['result']['hypotheses'][0]['transcript'])
break
else:
print("Retrying in 5 seconds")
time.sleep(5)
return ("No transcript returned")
def main():
parser = argparse.ArgumentParser(description='Command line client for sqcserver')
parser.add_argument(
'-u', '--uri', default="ws://localhost:8888/client/ws/speech", dest="uri", help="Server websocket URI"
)
parser.add_argument(
'-d', '--debug', action='store_true', default=False, dest="debug", help="boolean debug flag, defaults to false"
)
parser.add_argument(
'-t',
'--interpreter',
default="none",
dest="interpreter",
type=str,
help="interpreter to use in post processing data"
)
parser.add_argument(
'-r',
'--rate',
default=128000,
dest="rate",
type=int,
help="Rate in bytes/sec at which audio should be sent to the server. NB! For raw 16-bit audio it must be 2*samplerate!"
)
parser.add_argument(
'--content-type',
default='',
help="Use the specified content type (empty by default, for raw files the default is audio/x-raw, layout=(string)interleaved, rate=(int)<rate>, format=(string)S16LE, channels=(int)1"
)
parser.add_argument('audiofile', help="Audio file to be sent to the server")
args = parser.parse_args()
content_type = args.content_type
if content_type == '' and args.audiofile.endswith(".raw"):
content_type = "?content-type=audio/x-raw,layout=(string)interleaved,rate=(int)%d,format=(string)S16LE,channels=(int)1" % (
args.rate / 2
)
content_type += "&content-id=" + args.interpreter
elif content_type:
content_type += "&content-id=" + args.interpreter
else:
content_type += "?content-id=" + args.interpreter
data = load_data(args.audiofile)
result = make_connection(data, args.uri + content_type, args.debug)
print(result)
def cleanTranscript(transcript):
if transcript == None:
return transcript
while "<" in transcript:
start = transcript.index("<")
end = transcript.index(">")
transcript = transcript[:start] + transcript[end + 1:]
return transcript.strip()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment