Created
September 23, 2023 03:57
-
-
Save fsndzomga/d595631f2f49d5084237b0e1662e67f4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from typing import Any, List, Callable, Optional, Union | |
from mytoken import apikey | |
import os | |
from anonLLM.llm import OpenaiLanguageModel | |
from pydantic import BaseModel, create_model | |
import inspect | |
import json | |
os.environ["OPENAI_API_KEY"] = apikey | |
class StepBase(BaseModel): | |
step: str | |
class ToolId(BaseModel): | |
id: int | |
class Tool(): | |
_all_tools = [] | |
def __init__(self, name, description, call_function: Callable[..., Any]) -> None: | |
self.name = name | |
self.description = description | |
self._call_function = call_function | |
Tool._all_tools.append(self) | |
@classmethod | |
def all(cls) -> List[Tool]: | |
return cls._all_tools | |
def __call__(self, *args: Any, **kwargs: Any) -> str: | |
return self._call_function(*args, **kwargs) | |
def __repr__(self) -> str: | |
tool_id = Tool._all_tools.index(self) | |
return f"Tool({self.name}): {self.description}, Tool id = {tool_id}" | |
class Agent(): | |
def __init__(self) -> None: | |
self.llm = OpenaiLanguageModel(anonymize=False) | |
self.memory = {} | |
# Define standard tools | |
calc_tool = Tool("Calculator", "Performs basic mathematical operations", Agent.calculator) # noqa | |
self.tools = Tool.all() | |
def __call__(self, user_input: str) -> str: | |
tool_use_prompt = " " | |
if len(self.tools) > 0: | |
tool_list = ', '.join([repr(tool) for tool in self.tools]) | |
tool_use_prompt = f"You can use the following tools: {tool_list}" | |
step_prompt = '''You are a useful AI Agent. | |
Given the following user input: {} | |
What is the first step you should take to answer the user request | |
correctly ? | |
{}'''.format(user_input, tool_use_prompt) | |
# Initialize memory for this user input | |
input_key = hash(user_input) | |
self.memory[input_key] = [] | |
print(user_input, "\n") | |
print("Thinking...", "\n") | |
step_dict = self.llm.generate(step_prompt, output_format=StepBase) | |
while True: | |
step = step_dict["step"] | |
if 'break' in step.lower(): | |
final_response_prompt = ''' | |
Given all this: {} | |
Give the final answer to this user request: {} | |
'''.format(" ,".join(f"{item['step']} -> {item['response']}" for item in self.memory[input_key]) | |
, user_input) | |
final_response = self.llm.generate(final_response_prompt) | |
print(final_response, "\n") | |
break | |
print(step, "\n") | |
print("Thinking...", "\n") | |
response = self.execute_step(step) | |
self.memory[input_key].append( | |
{ | |
"step": step, | |
"response": response | |
} | |
) | |
print(response, "\n") | |
print("Thinking...", "\n") | |
step_prompt = '''Given the following user input: {} | |
You already took the following step or steps | |
{}. Is is enough to respond to the user? | |
If not what is the next step ? Return a step named 'break' if the problem is solved'''.format(user_input, " ,".join(f"{item['step']} -> {item['response']}" for item in self.memory[input_key]) | |
) | |
step_dict = self.llm.generate(step_prompt, output_format=StepBase) | |
def execute_step(self, step): | |
if len(self.tools) > 0: | |
execute_prompt = ''' Given the following task: {} | |
And the following tools: {} | |
Do you need to use a tool to do this? | |
If yes return the id of the tool. | |
If not return 10000 as id | |
'''.format(step, self.tools) | |
tool_id_to_use = self.llm.generate(execute_prompt, | |
output_format=ToolId)["id"] | |
if tool_id_to_use != 10000: | |
tool = self.tools[tool_id_to_use] | |
# Get the function's signature | |
sig = inspect.signature(tool._call_function) | |
source_code = inspect.getsource(tool._call_function) | |
params = {k: (v.annotation, ...) for k, v in sig.parameters.items()} | |
# Create a dynamic BaseModel for the function's parameters | |
DynamicModel = create_model("DynamicModel", **params) | |
# Now, you can use this model as the output format for `self.llm.generate` | |
function_params = self.llm.generate(f"Provide parameters for tool {repr(tool)} to solve this task: {step}. Knowing that this is the source code of the function the tool uses: {source_code}. Your response should be a JSON respecting the following structure {DynamicModel.__annotations__} with the correct values for the problem at hand.") | |
parsed_data = json.loads(function_params) | |
# Call the tool with the provided parameters | |
response = tool(**parsed_data) | |
return response | |
return self.llm.generate(f"Given your current knowledge. Execute this step:{step}") | |
@staticmethod | |
def calculator(operation: str, value1: Union[int, float], value2: Optional[Union[int, float]] = None) -> float: | |
if operation == "add": | |
return value1 + value2 | |
elif operation == "subtract": | |
return value1 - value2 | |
elif operation == "multiply": | |
return value1 * value2 | |
elif operation == "divide": | |
return value1 / value2 | |
elif operation == "square_root": | |
return value1 ** 0.5 | |
elif operation == "square": | |
return value1 ** 2 | |
elif operation == "power": | |
if value2 is None: | |
raise ValueError("For the power operation, value2 (exponent) is required.") | |
return value1 ** value2 | |
else: | |
raise ValueError(f"Unsupported operation: {operation}") | |
agent = Agent() | |
response = agent("What is the square root of 44494949494949494") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment