Created
October 30, 2023 01:17
-
-
Save ranfysvalle02/b835e3643fceec00bafe23917684df60 to your computer and use it in GitHub Desktop.
ActionWeaver Agent for MongoDB Atlas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import openai | |
from actionweaver.llms.openai.chat import OpenAIChatCompletion | |
from actionweaver import action, RequireNext,SelectOne | |
openai.api_key = "sk-" #os.getenv("OPENAI_API_KEY") | |
from pymongo import MongoClient | |
import json | |
connection_string="mongodb+srv://abc123:[email protected]/?retryWrites=true&w=majority" | |
mongodb_client = MongoClient(connection_string) | |
import logging | |
from typing import List | |
from actionweaver import * | |
from actionweaver.llms.openai.chat import OpenAIChatCompletion | |
from actionweaver.llms.openai.tokens import TokenUsageTracker | |
logger = logging.getLogger(__name__) | |
def print_output(output): | |
if type(output) == itertools._tee: | |
for chunk in output: | |
content = chunk["choices"][0].get("delta", {}).get("content") | |
if content is not None: | |
print(content, end='') | |
else: | |
print (output) | |
class AgentV0: | |
def __init__(self, logger): | |
self.logger = logger | |
self.token_tracker = TokenUsageTracker(budget=None, logger=logger) | |
self.llm = OpenAIChatCompletion("gpt-3.5-turbo", token_usage_tracker = self.token_tracker, logger=logger) | |
self.messages = [ | |
{"role": "system", "content": "You are a resourceful AI assistant, Your specialization is answering questions about MongoDB Atlas environments."}, | |
{"role": "system", "content": "Think critically and step by step. You already have access to the MongoDB Atlas environment, and have the tools at your disposal to answer the question."}, | |
] | |
self.times = [] | |
class MongoDBUtility(AgentV0): | |
def truncate_string(self,s, max_length=1): | |
return s[:max_length] | |
def round_float(self,f, decimal_places=1): | |
return round(f, decimal_places) | |
def limit_list(self,l, max_length=1): | |
return l[:max_length] | |
def limit_dict(self,d, max_keys=5): | |
return {k: d[k] for k in list(d.keys())[:max_keys]} | |
def process_value(self,value): | |
if isinstance(value, str): | |
return self.truncate_string(value) | |
elif isinstance(value, float): | |
return self.round_float(value) | |
elif isinstance(value, list): | |
return self.limit_list(value) | |
elif isinstance(value, dict): | |
return self.limit_dict(value) | |
else: | |
return value | |
@action(name="handle_mdb", orch_expr = SelectOne(["handle_mdb","atlas_contents","db_contents","coll_contents"])) | |
def handle_mdb(self, instruction: str) -> str: | |
""" | |
Invoke this ALWAYS. Put every context in the instruction only! | |
Args: | |
instruction (str): The user's instruction | |
Returns: | |
str: The response to the user's question. | |
""" | |
return instruction | |
@action(name="atlas_contents", orch_expr = RequireNext(["atlas_contents"])) | |
def atlas_contents(self,instruction:str) -> List: | |
""" | |
Invoke this if you need to get the contents of a MongoDB Atlas cluster. | |
Args: | |
instruction (str): The user's instruction | |
Returns: | |
str: The available MongoDB databases | |
""" | |
logger.info(f"atlas_contents") | |
db_dict = {} | |
db_names = mongodb_client.list_database_names() | |
db_dict['database_names'] = db_names | |
print( | |
"\n\n====atlas_contents====\n\n", | |
f"Here are the available MongoDB databases:\n{db_dict}", | |
"\n\n====atlas_contents====\n\n" | |
) | |
return f"Here are the available MongoDB databases:\n{db_dict}" | |
@action(name="db_contents", orch_expr = RequireNext(["db_contents"])) | |
def db_contents(self,db_name:str) -> List: | |
""" | |
Invoke this if you need to get the contents of a MongoDB database. | |
Args: | |
db_name (str): The name of the MongoDB database | |
Returns: | |
str: the available MongoDB collections in the requested database. | |
""" | |
logger.info(f"db_contents") | |
database = mongodb_client[db_name] | |
coll_names = database.list_collection_names() | |
print( | |
"\n\n====db_contents====\n\n", | |
f"Here are the available MongoDB collections in the requested database:\n{coll_names}", | |
"\n\n====db_contents====\n\n" | |
) | |
return f"Here are the available MongoDB collections in the requested database:\n{coll_names}" | |
@action(name="coll_contents", orch_expr = RequireNext(["coll_contents"])) | |
def coll_contents(self,db_name:str,coll:str) -> List: | |
""" | |
Invoke this if you need to directly access the contents of MongoDB collection. | |
Args: | |
db_name (str): The name of the MongoDB database | |
coll (str): The name of the MongoDB collection | |
Returns: | |
str: The sample of whats in the collection | |
""" | |
logger.info(f"coll_contents") | |
database = mongodb_client[db_name] | |
collection = database[coll] | |
# Sample the collection | |
pipeline = [{"$sample": {"size": 1}}] | |
samples = list(collection.aggregate(pipeline)) | |
# Convert the samples to a minimal JSON object | |
minimal_samples = [] | |
for sample in samples: | |
minimal_sample = {} | |
for key, value in sample.items(): | |
if isinstance(value, (str, int, float, bool, list, dict, type(None))): | |
minimal_sample[key] = self.process_value(value) | |
minimal_samples.append(minimal_sample) | |
coll_details = json.dumps(minimal_samples, default=str) | |
print( | |
"\n\n====coll_contents====\n\n", | |
f"Here is whats in the collection '{coll}' in the database '{db_name}':\n{coll_details}", | |
"\n\n====coll_contents====\n\n" | |
) | |
return f"Here is a sample of whats in the collection:\n{coll_details}" | |
@action(name="answer_question", orch_expr = RequireNext(["answer_question"])) | |
def answer_question(self,db_name:str,coll:str,Q:str,pipeline) -> List: | |
""" | |
Invoke this to answer a question related to the contents of MongoDB collection. | |
Args: | |
db_name (str): The name of the MongoDB database | |
coll (str): The name of the MongoDB collection | |
Q (str): The question that needs to be answered. | |
pipeline: A MongoDB aggregation pipeline that best answers the question | |
Returns: | |
str: The sample of whats in the collection | |
""" | |
logger.info(f"answer_question") | |
database = mongodb_client[db_name] | |
collection = database[coll] | |
print(pipeline) | |
print( | |
"\n\n====PIPELINE FOR "+Q+"====\n\n", | |
pipeline, | |
"\n\n====PIPELINE FOR "+Q+"====\n\n" | |
) | |
samples = list(collection.aggregate(pipeline)) | |
return f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}" | |
def __call__(self, text): | |
self.messages += [{"role": "user", "content":text}] | |
response = self.llm.create(messages=self.messages, actions = [self.handle_mdb,self.atlas_contents,self.db_contents,self.coll_contents,self.answer_question], stream=True) | |
return response | |
agent = MongoDBUtility(logger) | |
print_output(agent("Tell me what's in MongoDB Atlas.")) | |
print_output(agent("What are the collections in sample_mflix?")) | |
print_output(agent("What is inside the movies collection?")) | |
print_output(agent("What is the best movie?")) | |
print_output(agent("What is the best Animation movie from the USA in the year 1999?")) | |
print_output(agent("Can you change your aggregation pipeline to check for Comedy and Animation movies?")) |
Author
ranfysvalle02
commented
Oct 30, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment