Skip to content

Instantly share code, notes, and snippets.

@fsndzomga
Last active September 23, 2023 03:35
Show Gist options
  • Save fsndzomga/c9bc7d2bf3cba07a8096475ed17887f6 to your computer and use it in GitHub Desktop.
Save fsndzomga/c9bc7d2bf3cba07a8096475ed17887f6 to your computer and use it in GitHub Desktop.
AI agent class
class Agent():
def __init__(self) -> None:
self.llm = OpenaiLanguageModel(anonymize=False)
self.memory = {}
# Define standard tools
calc_tool = Tool("Calculator", "Performs basic mathematical operations", Agent.calculator) # noqa
self.tools = Tool.all()
def __call__(self, user_input: str) -> str:
tool_use_prompt = " "
if len(self.tools) > 0:
tool_list = ', '.join([repr(tool) for tool in self.tools])
tool_use_prompt = f"You can use the following tools: {tool_list}"
step_prompt = '''You are a useful AI Agent.
Given the following user input: {}
What is the first step you should take to answer the user request
correctly ?
{}'''.format(user_input, tool_use_prompt)
# Initialize memory for this user input
input_key = hash(user_input)
self.memory[input_key] = []
print(user_input, "\n")
print("Thinking...", "\n")
step_dict = self.llm.generate(step_prompt, output_format=StepBase)
while True:
step = step_dict["step"]
if 'break' in step.lower():
final_response_prompt = '''
Given all this: {}
Give the final answer to this user request: {}
'''.format(" ,".join(f"{item['step']} -> {item['response']}" for item in self.memory[input_key])
, user_input)
final_response = self.llm.generate(final_response_prompt)
print(final_response, "\n")
break
print(step, "\n")
print("Thinking...", "\n")
response = self.execute_step(step)
self.memory[input_key].append(
{
"step": step,
"response": response
}
)
print(response, "\n")
print("Thinking...", "\n")
step_prompt = '''Given the following user input: {}
You already took the following step or steps
{}. Is is enough to respond to the user?
If not what is the next step ? Return a step named 'break' if the problem is solved'''.format(user_input, " ,".join(f"{item['step']} -> {item['response']}" for item in self.memory[input_key])
)
step_dict = self.llm.generate(step_prompt, output_format=StepBase)
def execute_step(self, step):
if len(self.tools) > 0:
execute_prompt = ''' Given the following task: {}
And the following tools: {}
Do you need to use a tool to do this?
If yes return the id of the tool.
If not return 10000 as id
'''.format(step, self.tools)
tool_id_to_use = self.llm.generate(execute_prompt,
output_format=ToolId)["id"]
if tool_id_to_use != 10000:
tool = self.tools[tool_id_to_use]
# Get the function's signature
sig = inspect.signature(tool._call_function)
source_code = inspect.getsource(tool._call_function)
params = {k: (v.annotation, ...) for k, v in sig.parameters.items()}
# Create a dynamic BaseModel for the function's parameters
DynamicModel = create_model("DynamicModel", **params)
# Now, you can use this model as the output format for `self.llm.generate`
function_params = self.llm.generate(f"Provide parameters for tool {repr(tool)} to solve this task: {step}. Knowing that this is the source code of the function the tool uses: {source_code}. Your response should be a JSON respecting the following structure {DynamicModel.__annotations__} with the correct values for the problem at hand.")
parsed_data = json.loads(function_params)
# Call the tool with the provided parameters
response = tool(**parsed_data)
return response
return self.llm.generate(f"Given your current knowledge. Execute this step:{step}")
@staticmethod
def calculator(operation: str, value1: Union[int, float], value2: Optional[Union[int, float]] = None) -> float:
if operation == "add":
return value1 + value2
elif operation == "subtract":
return value1 - value2
elif operation == "multiply":
return value1 * value2
elif operation == "divide":
return value1 / value2
elif operation == "square_root":
return value1 ** 0.5
elif operation == "square":
return value1 ** 2
elif operation == "power":
if value2 is None:
raise ValueError("For the power operation, value2 (exponent) is required.")
return value1 ** value2
else:
raise ValueError(f"Unsupported operation: {operation}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment