Skip to content

Instantly share code, notes, and snippets.

@hamees-sayed
Created April 17, 2025 07:53
Show Gist options
  • Save hamees-sayed/88a43fe38d9dd634edc7fe002768609f to your computer and use it in GitHub Desktop.
Save hamees-sayed/88a43fe38d9dd634edc7fe002768609f to your computer and use it in GitHub Desktop.
import httpx
from dotenv import load_dotenv
from deepgram import DeepgramClient, LiveTranscriptionEvents, LiveOptions
import threading
load_dotenv()
URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
def main():
try:
deepgram = DeepgramClient(api_key="YOUR_API_KEY")
dg_connection = deepgram.listen.websocket.v("1")
def on_message(self, result, **kwargs):
print(f"speaker: {result.channel.alternatives[0].transcript}")
dg_connection.on(LiveTranscriptionEvents.Transcript, on_message)
options = LiveOptions(
model="nova-3",
language="multi",
punctuate=True,
encoding="linear16"
)
dg_connection.start(options)
def myThread():
with httpx.stream("GET", URL) as r:
for data in r.iter_bytes():
if exit_flag:
break
dg_connection.send(data)
exit_flag = False
worker_thread = threading.Thread(target=myThread)
worker_thread.start()
input("\nPress Enter to stop...\n")
exit_flag = True
worker_thread.join()
dg_connection.finish()
print("Finished")
except Exception as e:
print(f"Could not open socket: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment