Created
February 4, 2024 07:20
-
-
Save ranfysvalle02/8bce6520ce76924a2cc07ecdd08d46ee to your computer and use it in GitHub Desktop.
Chat with your MongoDB Atlas environment in a unique way
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import openai | |
import itertools | |
import inspect | |
from actionweaver import action | |
from typing import List | |
from pymongo import MongoClient | |
import json | |
connection_string="mongodb+srv://<user>:<password>@cluster0.<xxx>.mongodb.net/test" | |
mongodb_client = MongoClient(connection_string) | |
DEBUG=True | |
from actionweaver.llms.azure.chat import ChatCompletion | |
from actionweaver.utils.tokens import TokenUsageTracker | |
from actionweaver import action | |
logging.basicConfig( | |
filename='agent.log', | |
filemode='a', | |
format='%(asctime)s.%(msecs)04d %(levelname)s {%(module)s} [%(funcName)s] %(message)s', | |
level=logging.INFO, | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
class AzureAgent: | |
def __init__(self, logger): | |
self.logger = logger | |
self.token_tracker = TokenUsageTracker(budget=None) | |
self.token_budget = 2000 | |
self.llm = ChatCompletion( | |
model="gpt-4", azure_deployment="gpt-4", | |
azure_endpoint="https://<demo>.openai.azure.com/", api_key="<api-key-here>", | |
api_version="2023-10-01-preview", | |
token_usage_tracker = TokenUsageTracker(budget=self.token_budget), | |
logger=logger) | |
self.messages = [ | |
{"role": "system", "content": "You are a resourceful AI assistant, Your specialization is answering questions about MongoDB Atlas environments."}, | |
{"role": "system", "content": "Think critically and step by step. You already have access to the MongoDB Atlas environment, and have the tools at your disposal to answer the question."}, | |
{"role":"system", "content":" [IMPORTANT: NEVER ANSWER DIRECTLY! ALWAYS USE YOUR AVAILABLE TOOLS!]"}, | |
{"role":"assistant", "content":" [IMPORTANT: I WILL NEVER ANSWER DIRECTLY! I WILL ALWAYS USE MY AVAILABLE TOOLS!]"}, | |
] | |
self.times = [] | |
class MongoDBUtility(AzureAgent): | |
def truncate_string(self,s, max_length=1): | |
return s[:max_length] | |
def round_float(self,f, decimal_places=1): | |
return round(f, decimal_places) | |
def limit_list(self,l, max_length=1): | |
return l[:max_length] | |
def limit_dict(self,d, max_keys=5): | |
return {k: d[k] for k in list(d.keys())[:max_keys]} | |
def process_value(self,value): | |
if isinstance(value, str): | |
return self.truncate_string(value) | |
elif isinstance(value, float): | |
return self.round_float(value) | |
elif isinstance(value, list): | |
return self.limit_list(value) | |
elif isinstance(value, dict): | |
return self.limit_dict(value) | |
else: | |
return value | |
@action(name="handle_mdb") | |
def handle_mdb(self, userprompt: str) -> str: | |
""" | |
Invoke this to respond to every user prompt. | |
Args: | |
userprompt (str): The user's prompt | |
Returns: | |
str: The userprompt | |
""" | |
return userprompt | |
@action(name="atlas_contents") | |
def atlas_contents(self,instruction:str) -> List: | |
""" | |
Invoke this if you need to get the contents of a MongoDB Atlas cluster. | |
Args: | |
instruction (str): The user's instruction | |
Returns: | |
str: The available MongoDB databases | |
""" | |
logger.info(f"atlas_contents") | |
db_dict = {} | |
db_names = mongodb_client.list_database_names() | |
db_dict['database_names'] = db_names | |
if DEBUG: | |
print( | |
"\n\n====atlas_contents====\n\n", | |
f"Here are the available MongoDB databases:\n{db_dict}", | |
"\n\n====atlas_contents====\n\n" | |
) | |
return f"Here are the available MongoDB databases:\n{db_dict}" | |
@action(name="db_contents") | |
def db_contents(self,db_name:str) -> List: | |
""" | |
Invoke this if you need to get the contents of a MongoDB database. | |
Args: | |
db_name (str): The name of the MongoDB database | |
Returns: | |
str: the available MongoDB collections in the requested database. | |
""" | |
logger.info(f"db_contents") | |
database = mongodb_client[db_name] | |
coll_names = database.list_collection_names() | |
if DEBUG: | |
print( | |
"\n\n====db_contents====\n\n", | |
f"Here are the available MongoDB collections in the requested database:\n{coll_names}", | |
"\n\n====db_contents====\n\n" | |
) | |
return f"Here are the available MongoDB collections in the requested database:\n{coll_names}" | |
@action(name="coll_contents") | |
def coll_contents(self,db_name:str,coll:str) -> List: | |
""" | |
Invoke this if you need to directly access the contents of MongoDB collection. | |
Args: | |
db_name (str): The name of the MongoDB database | |
coll (str): The name of the MongoDB collection | |
Returns: | |
str: The sample of whats in the collection | |
""" | |
logger.info(f"coll_contents") | |
database = mongodb_client[db_name] | |
collection = database[coll] | |
# Sample the collection | |
pipeline = [{"$sample": {"size": 1}}] | |
samples = list(collection.aggregate(pipeline)) | |
# Convert the samples to a minimal JSON object | |
minimal_samples = [] | |
for sample in samples: | |
minimal_sample = {} | |
for key, value in sample.items(): | |
if isinstance(value, (str, int, float, bool, list, dict, type(None))): | |
minimal_sample[key] = self.process_value(value) | |
minimal_samples.append(minimal_sample) | |
coll_details = json.dumps(minimal_samples, default=str) | |
if DEBUG: | |
print( | |
"\n\n====coll_contents====\n\n", | |
f"Here is whats in the collection '{coll}' in the database '{db_name}':\n{coll_details}", | |
"\n\n====coll_contents====\n\n" | |
) | |
return f"Here is a sample of whats in the collection:\n{coll_details}" | |
@action(name="answer_question") | |
def answer_question(self,db_name:str,coll:str,Q:str,pipeline) -> List: | |
""" | |
Invoke this to answer a question related to the contents of MongoDB collection. | |
Args: | |
db_name (str): The name of the MongoDB database | |
coll (str): The name of the MongoDB collection | |
Q (str): The question that needs to be answered. | |
pipeline: A MongoDB aggregation pipeline that best answers the question | |
Returns: | |
str: The sample of whats in the collection | |
""" | |
logger.info(f"answer_question") | |
database = mongodb_client[db_name] | |
collection = database[coll] | |
if DEBUG: | |
print(pipeline) | |
print( | |
"\n\n====PIPELINE FOR "+Q+"====\n\n", | |
pipeline, | |
"\n\n====PIPELINE FOR "+Q+"====\n\n" | |
) | |
samples = list(collection.aggregate(pipeline)) | |
if DEBUG: | |
print( | |
f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}" | |
) | |
return f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}" | |
def __init__(self, logger, public_key="",private_key=""): | |
super().__init__(logger) | |
self.api_base_url = "https://cloud.mongodb.com/api/atlas/v1.0" | |
def __call__(self, text): | |
print("\nUser:", text,"\n") | |
self.messages += [{"role": "user", "content":text}] | |
response = self.llm.create(messages=self.messages, actions = [ | |
self.handle_mdb | |
], | |
orch={ | |
self.handle_mdb.name: | |
self.atlas_contents | |
, | |
self.atlas_contents.name: | |
self.db_contents | |
, | |
self.db_contents.name: | |
self.coll_contents | |
, | |
self.coll_contents.name: | |
self.answer_question | |
, | |
self.answer_question.name: None | |
}, | |
stream=False) | |
return response | |
def print_output(output): | |
print("\nAssistant:", output.choices[0].message.content,"\n") | |
agent = MongoDBUtility(logger) | |
#print_output(agent("Tell me what's in MongoDB Atlas.")) | |
#print_output(agent("What are the collections in sample_mflix?")) | |
#print_output(agent("What is inside the movies collection?")) | |
#print_output(agent("What is the best movie?")) | |
#print_output(agent("What is the best Animation movie from the USA in the year 1999?")) | |
print_output(agent("Can you change your aggregation pipeline to check for Comedy and Animation movies?")) |
Author
ranfysvalle02
commented
Feb 4, 2024

Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment