Skip to content

Instantly share code, notes, and snippets.

@ranfysvalle02
Created February 4, 2024 07:20
Show Gist options
  • Save ranfysvalle02/8bce6520ce76924a2cc07ecdd08d46ee to your computer and use it in GitHub Desktop.
Save ranfysvalle02/8bce6520ce76924a2cc07ecdd08d46ee to your computer and use it in GitHub Desktop.
Chat with your MongoDB Atlas environment in a unique way
import logging
import os
import openai
import itertools
import inspect
from actionweaver import action
from typing import List
from pymongo import MongoClient
import json
connection_string="mongodb+srv://<user>:<password>@cluster0.<xxx>.mongodb.net/test"
mongodb_client = MongoClient(connection_string)
DEBUG=True
from actionweaver.llms.azure.chat import ChatCompletion
from actionweaver.utils.tokens import TokenUsageTracker
from actionweaver import action
logging.basicConfig(
filename='agent.log',
filemode='a',
format='%(asctime)s.%(msecs)04d %(levelname)s {%(module)s} [%(funcName)s] %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class AzureAgent:
def __init__(self, logger):
self.logger = logger
self.token_tracker = TokenUsageTracker(budget=None)
self.token_budget = 2000
self.llm = ChatCompletion(
model="gpt-4", azure_deployment="gpt-4",
azure_endpoint="https://<demo>.openai.azure.com/", api_key="<api-key-here>",
api_version="2023-10-01-preview",
token_usage_tracker = TokenUsageTracker(budget=self.token_budget),
logger=logger)
self.messages = [
{"role": "system", "content": "You are a resourceful AI assistant, Your specialization is answering questions about MongoDB Atlas environments."},
{"role": "system", "content": "Think critically and step by step. You already have access to the MongoDB Atlas environment, and have the tools at your disposal to answer the question."},
{"role":"system", "content":" [IMPORTANT: NEVER ANSWER DIRECTLY! ALWAYS USE YOUR AVAILABLE TOOLS!]"},
{"role":"assistant", "content":" [IMPORTANT: I WILL NEVER ANSWER DIRECTLY! I WILL ALWAYS USE MY AVAILABLE TOOLS!]"},
]
self.times = []
class MongoDBUtility(AzureAgent):
def truncate_string(self,s, max_length=1):
return s[:max_length]
def round_float(self,f, decimal_places=1):
return round(f, decimal_places)
def limit_list(self,l, max_length=1):
return l[:max_length]
def limit_dict(self,d, max_keys=5):
return {k: d[k] for k in list(d.keys())[:max_keys]}
def process_value(self,value):
if isinstance(value, str):
return self.truncate_string(value)
elif isinstance(value, float):
return self.round_float(value)
elif isinstance(value, list):
return self.limit_list(value)
elif isinstance(value, dict):
return self.limit_dict(value)
else:
return value
@action(name="handle_mdb")
def handle_mdb(self, userprompt: str) -> str:
"""
Invoke this to respond to every user prompt.
Args:
userprompt (str): The user's prompt
Returns:
str: The userprompt
"""
return userprompt
@action(name="atlas_contents")
def atlas_contents(self,instruction:str) -> List:
"""
Invoke this if you need to get the contents of a MongoDB Atlas cluster.
Args:
instruction (str): The user's instruction
Returns:
str: The available MongoDB databases
"""
logger.info(f"atlas_contents")
db_dict = {}
db_names = mongodb_client.list_database_names()
db_dict['database_names'] = db_names
if DEBUG:
print(
"\n\n====atlas_contents====\n\n",
f"Here are the available MongoDB databases:\n{db_dict}",
"\n\n====atlas_contents====\n\n"
)
return f"Here are the available MongoDB databases:\n{db_dict}"
@action(name="db_contents")
def db_contents(self,db_name:str) -> List:
"""
Invoke this if you need to get the contents of a MongoDB database.
Args:
db_name (str): The name of the MongoDB database
Returns:
str: the available MongoDB collections in the requested database.
"""
logger.info(f"db_contents")
database = mongodb_client[db_name]
coll_names = database.list_collection_names()
if DEBUG:
print(
"\n\n====db_contents====\n\n",
f"Here are the available MongoDB collections in the requested database:\n{coll_names}",
"\n\n====db_contents====\n\n"
)
return f"Here are the available MongoDB collections in the requested database:\n{coll_names}"
@action(name="coll_contents")
def coll_contents(self,db_name:str,coll:str) -> List:
"""
Invoke this if you need to directly access the contents of MongoDB collection.
Args:
db_name (str): The name of the MongoDB database
coll (str): The name of the MongoDB collection
Returns:
str: The sample of whats in the collection
"""
logger.info(f"coll_contents")
database = mongodb_client[db_name]
collection = database[coll]
# Sample the collection
pipeline = [{"$sample": {"size": 1}}]
samples = list(collection.aggregate(pipeline))
# Convert the samples to a minimal JSON object
minimal_samples = []
for sample in samples:
minimal_sample = {}
for key, value in sample.items():
if isinstance(value, (str, int, float, bool, list, dict, type(None))):
minimal_sample[key] = self.process_value(value)
minimal_samples.append(minimal_sample)
coll_details = json.dumps(minimal_samples, default=str)
if DEBUG:
print(
"\n\n====coll_contents====\n\n",
f"Here is whats in the collection '{coll}' in the database '{db_name}':\n{coll_details}",
"\n\n====coll_contents====\n\n"
)
return f"Here is a sample of whats in the collection:\n{coll_details}"
@action(name="answer_question")
def answer_question(self,db_name:str,coll:str,Q:str,pipeline) -> List:
"""
Invoke this to answer a question related to the contents of MongoDB collection.
Args:
db_name (str): The name of the MongoDB database
coll (str): The name of the MongoDB collection
Q (str): The question that needs to be answered.
pipeline: A MongoDB aggregation pipeline that best answers the question
Returns:
str: The sample of whats in the collection
"""
logger.info(f"answer_question")
database = mongodb_client[db_name]
collection = database[coll]
if DEBUG:
print(pipeline)
print(
"\n\n====PIPELINE FOR "+Q+"====\n\n",
pipeline,
"\n\n====PIPELINE FOR "+Q+"====\n\n"
)
samples = list(collection.aggregate(pipeline))
if DEBUG:
print(
f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}"
)
return f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}"
def __init__(self, logger, public_key="",private_key=""):
super().__init__(logger)
self.api_base_url = "https://cloud.mongodb.com/api/atlas/v1.0"
def __call__(self, text):
print("\nUser:", text,"\n")
self.messages += [{"role": "user", "content":text}]
response = self.llm.create(messages=self.messages, actions = [
self.handle_mdb
],
orch={
self.handle_mdb.name:
self.atlas_contents
,
self.atlas_contents.name:
self.db_contents
,
self.db_contents.name:
self.coll_contents
,
self.coll_contents.name:
self.answer_question
,
self.answer_question.name: None
},
stream=False)
return response
def print_output(output):
print("\nAssistant:", output.choices[0].message.content,"\n")
agent = MongoDBUtility(logger)
#print_output(agent("Tell me what's in MongoDB Atlas."))
#print_output(agent("What are the collections in sample_mflix?"))
#print_output(agent("What is inside the movies collection?"))
#print_output(agent("What is the best movie?"))
#print_output(agent("What is the best Animation movie from the USA in the year 1999?"))
print_output(agent("Can you change your aggregation pipeline to check for Comedy and Animation movies?"))
@ranfysvalle02
Copy link
Author

Screenshot 2024-02-04 at 2 50 56 AM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment