Skip to content

Instantly share code, notes, and snippets.

@thepycoach
Created January 19, 2022 02:07
Show Gist options
  • Select an option

  • Save thepycoach/79f41f1d0f5c59ea15aacd2a41459415 to your computer and use it in GitHub Desktop.

Select an option

Save thepycoach/79f41f1d0f5c59ea15aacd2a41459415 to your computer and use it in GitHub Desktop.
import websockets
import asyncio
import base64
import json
from configure import auth_key
# the AssemblyAI endpoint we're going to hit
URL = "wss://api.assemblyai.com/v2/realtime/ws?sample_rate=16000"
async def send_receive():
print(f'Connecting websocket to url ${URL}')
async with websockets.connect(
URL,
extra_headers=(("Authorization", auth_key),),
ping_interval=5,
ping_timeout=20
) as _ws:
await asyncio.sleep(0.1)
print("Receiving SessionBegins ...")
session_begins = await _ws.recv()
print(session_begins)
print("Sending messages ...")
async def send():
while True:
try:
data = stream.read(FRAMES_PER_BUFFER)
data = base64.b64encode(data).decode("utf-8")
json_data = json.dumps({"audio_data":str(data)})
await _ws.send(json_data)
except websockets.exceptions.ConnectionClosedError as e:
print(e)
assert e.code == 4008
break
except Exception as e:
assert False, "Not a websocket 4008 error"
await asyncio.sleep(0.01)
return True
async def receive():
while True:
try:
result_str = await _ws.recv()
print(json.loads(result_str)['text'])
except websockets.exceptions.ConnectionClosedError as e:
print(e)
assert e.code == 4008
break
except Exception as e:
assert False, "Not a websocket 4008 error"
send_result, receive_result = await asyncio.gather(send(), receive())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment