Last active
November 16, 2023 10:49
-
-
Save breezedeus/718ac81ef4e8a411b16588549d37b929 to your computer and use it in GitHub Desktop.
A simple script to run the newest OpenAI Assistants Functions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # coding: utf-8 | |
| # !pip install -U openai | |
| import time | |
| from copy import deepcopy | |
| import openai | |
| from openai._types import NOT_GIVEN | |
| openai.api_key = '<YOUR_API_KEY>' | |
| DEFAULT_ASSISTANT_CONFIG = dict( | |
| name="My Private Assistant", | |
| description="You are My Private Assistant. Answer my questions precisely and accurately.", | |
| model="gpt-3.5-turbo-1106", | |
| tools=[{"type": "code_interpreter"}, {"type": "retrieval"}], | |
| ) | |
| class OpenAIAssistant(object): | |
| def __init__(self, assistant_config, files=None): | |
| _config = deepcopy(DEFAULT_ASSISTANT_CONFIG) | |
| _config.update(assistant_config) | |
| self._assistant_name = _config['name'] | |
| files = files or [] | |
| self.init_assistant(_config, files) | |
| def init_assistant(self, assistant_config, fps): | |
| file_ids = NOT_GIVEN | |
| if fps: | |
| file_ids = [] | |
| for fp in fps: | |
| file = openai.files.create(file=open(fp, "rb"), purpose='assistants') | |
| file_ids.append(file.id) | |
| self.assistant = openai.beta.assistants.create( | |
| name=assistant_config["name"], | |
| description=assistant_config["description"], | |
| model=assistant_config["model"], | |
| tools=assistant_config["tools"], | |
| file_ids=file_ids, | |
| ) | |
| self.thread = openai.beta.threads.create() | |
| def talk(self): | |
| while True: | |
| question = input('You ==>: ') | |
| if question.lower().strip() in ('end', 'quit'): | |
| break | |
| response = self.ask(question) | |
| print(f'{self._assistant_name} ==>: {response}') | |
| def ask(self, question: str): | |
| thread_message = openai.beta.threads.messages.create( | |
| self.thread.id, role="user", content=question, | |
| ) | |
| # print(thread_message) | |
| run = openai.beta.threads.runs.create( | |
| thread_id=self.thread.id, assistant_id=self.assistant.id | |
| ) | |
| while True: | |
| response = openai.beta.threads.runs.retrieve( | |
| thread_id=self.thread.id, run_id=run.id, | |
| ) | |
| if response.status in ('completed', 'failed', 'cancelled', 'expired'): | |
| res_status = response.status | |
| break | |
| time.sleep(1) | |
| thread_messages = openai.beta.threads.messages.list(self.thread.id) | |
| if res_status == 'completed': | |
| res, citations = self.extract_msg_response(thread_messages.data, run) | |
| if len(citations) > 0: | |
| print('\tcitations: {}'.format(citations)) | |
| return res | |
| else: | |
| return f'Bad response status: {res_status}' | |
| @classmethod | |
| def extract_msg_response(cls, messages, run): | |
| msgs = [msg for msg in messages if getattr(msg, 'run_id') == run.id] | |
| if len(msgs) < 1: | |
| print('Warning: something went wrong, please check it manually.') | |
| breakpoint() | |
| message = msgs[0] | |
| # Extract the message content | |
| message_content = message.content[0].text | |
| annotations = message_content.annotations | |
| citations = [] | |
| # Iterate over the annotations and add footnotes | |
| for index, annotation in enumerate(annotations): | |
| # Replace the text with a footnote | |
| message_content.value = message_content.value.replace( | |
| annotation.text, f' [{index}]' | |
| ) | |
| # Gather citations based on annotation attributes | |
| if (file_citation := getattr(annotation, 'file_citation', None)) : | |
| cited_file = openai.files.retrieve(file_citation.file_id) | |
| citations.append( | |
| f'[{index}] {file_citation.quote} from {cited_file.filename}' | |
| ) | |
| elif (file_path := getattr(annotation, 'file_path', None)) : | |
| cited_file = openai.files.retrieve(file_path.file_id) | |
| citations.append( | |
| f'[{index}] Click <here> to download {cited_file.filename}' | |
| ) | |
| # Note: File download functionality not implemented above for brevity | |
| # Add footnotes to the end of the message before displaying to user | |
| # message_content.value += '\n' + '\n'.join(citations) | |
| return message_content.value, citations | |
| if __name__ == '__main__': | |
| # # The easiest way to get started: | |
| # files = None | |
| # assistant_config = { | |
| # "name": "CnOCR Assistant", | |
| # } | |
| # A more complex example: | |
| files = ["/Users/king/Downloads/cnocr.md"] # just set to `None` if no files are needed | |
| description = """ | |
| CnOCR is a text recognition (Optical Character Recognition, referred to as OCR) toolkit under Python 3, | |
| which supports the recognition of common characters in Simplified Chinese, Traditional Chinese (some models), | |
| English and numbers, and the recognition of vertical text. | |
| Comes with 20+ trained recognition models for different application scenarios, | |
| and can be used directly after installation. The author of CnOCR is Breezedeus (https://www.breezedeus.com). | |
| """ # noqa | |
| assistant_config = { | |
| "name": "CnOCR Assistant", | |
| "description": description, | |
| "model": "gpt-3.5-turbo-1106", | |
| "tools": [{"type": "code_interpreter"}, {"type": "retrieval"}], | |
| } | |
| assistant = OpenAIAssistant(assistant_config, files) | |
| assistant.talk() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment