Skip to content

Instantly share code, notes, and snippets.

@rhymiz
Last active August 9, 2024 04:16
Show Gist options
  • Save rhymiz/de7e91ec46f0819424d4b50b1e2f1c06 to your computer and use it in GitHub Desktop.
Save rhymiz/de7e91ec46f0819424d4b50b1e2f1c06 to your computer and use it in GitHub Desktop.
Forked AsyncStream processing for OpenAI
import asyncio
from typing import Any, AsyncGenerator, Literal, Union
from openai import AsyncStream
from openai.types.chat.chat_completion_chunk import ChatCompletionChunk
oai = OpenAI()
async def _fork_stream(
self, stream: AsyncStream[ChatCompletionChunk]
) -> tuple[AsyncGenerator[ChatCompletionChunk, None], AsyncGenerator[ChatCompletionChunk, None]]:
"""
Fork the response stream to allow multiple consumers.
"""
queue1: asyncio.Queue[ChatCompletionChunk | None] = asyncio.Queue()
queue2: asyncio.Queue[ChatCompletionChunk | None] = asyncio.Queue()
async def producer() -> None:
try:
async for chunk in stream:
await asyncio.gather(queue1.put(chunk), queue2.put(chunk))
except Exception as e:
# Handle any exceptions that might occur during streaming
await asyncio.gather(queue1.put(None), queue2.put(None))
raise e
finally:
await asyncio.gather(queue1.put(None), queue2.put(None))
async def consumer(
queue: asyncio.Queue[ChatCompletionChunk | None],
) -> AsyncGenerator[ChatCompletionChunk, None]:
while True:
chunk = await queue.get()
if chunk is None:
break
yield chunk
queue.task_done()
# Start the producer task
asyncio.create_task(producer())
return consumer(queue1), consumer(queue2)
async def process_fork(stream: AsyncGenerator[ChatCompletionChunk, None]):
# process stream
async def get_completion(messages: list[dict[str, Any]) -> AsyncGenerator[ChatCompletionChunk, None]:
stream = oai.chat.completions.create(..., messages=messages, stream=True)
stream_1, stream_1 = await _fork_stream(stream)
# scenario 1 (process forked stream back-to-back)
for chunk in stream_1:
# do something
for chunk in stream_2:
# process the chunks again
# scenario 2 (return stream_1 and process stream_2 in another thread
asyncio.create_task(process_fork(stream_2))
return stream_1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment