Skip to content

Instantly share code, notes, and snippets.

@ranfysvalle02
Created February 10, 2024 01:38
Show Gist options
  • Save ranfysvalle02/d3c29ba27f2f6cf3dcee7bf5907a86e3 to your computer and use it in GitHub Desktop.
Save ranfysvalle02/d3c29ba27f2f6cf3dcee7bf5907a86e3 to your computer and use it in GitHub Desktop.
import logging
import os
import openai
import itertools
import inspect
from actionweaver import action
from typing import List
from pymongo import MongoClient
import json
import inspect
import requests
OSO_authToken = ""
DEMO_USER = "[email protected]"
def checkOSO():
apiUrl = 'https://cloud.osohq.com/api/list'
requestData = {
'actor_type': 'User',
'actor_id': DEMO_USER,
'action': 'view',
'resource_type': 'Function',
'context_facts': []
}
try:
response = requests.post(apiUrl, json=requestData, headers={
'Authorization': f'Bearer {OSO_authToken}',
'Accept': 'application/json',
'Content-Type': 'application/json'
})
return response.json()
except Exception as error:
print('Error:', error)
return error
accessList = checkOSO()
connection_string=""
mongodb_client = MongoClient(connection_string)
DEBUG=True
from actionweaver.llms.azure.chat import ChatCompletion
from actionweaver.utils.tokens import TokenUsageTracker
from actionweaver import action
logging.basicConfig(
filename='agent.log',
filemode='a',
format='%(asctime)s.%(msecs)04d %(levelname)s {%(module)s} [%(funcName)s] %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
class AzureAgent:
def __init__(self, logger):
self.logger = logger
self.token_tracker = TokenUsageTracker(budget=None)
self.token_budget = 2000
self.llm = ChatCompletion(
model="gpt-4", azure_deployment="gpt-4",
azure_endpoint="", api_key="",
api_version="2023-10-01-preview",
token_usage_tracker = TokenUsageTracker(budget=self.token_budget),
logger=logger)
self.messages = [
{"role": "system", "content": "You are a resourceful AI assistant, Your specialization is answering questions about MongoDB Atlas environments."},
{"role": "system", "content": "Think critically and step by step. You already have access to the MongoDB Atlas environment, and have the tools at your disposal to answer the question."},
{"role":"system", "content":" [IMPORTANT: NEVER ANSWER DIRECTLY! ALWAYS USE YOUR AVAILABLE TOOLS!]"},
{"role":"assistant", "content":" [IMPORTANT: I WILL NEVER ANSWER DIRECTLY! I WILL ALWAYS USE MY AVAILABLE TOOLS!]"},
]
self.times = []
class MongoDBUtility(AzureAgent):
def truncate_string(self,s, max_length=1):
return s[:max_length]
def round_float(self,f, decimal_places=1):
return round(f, decimal_places)
def limit_list(self,l, max_length=1):
return l[:max_length]
def limit_dict(self,d, max_keys=5):
return {k: d[k] for k in list(d.keys())[:max_keys]}
def process_value(self,value):
if isinstance(value, str):
return self.truncate_string(value)
elif isinstance(value, float):
return self.round_float(value)
elif isinstance(value, list):
return self.limit_list(value)
elif isinstance(value, dict):
return self.limit_dict(value)
else:
return value
def check_acl(self,functionName):
#functionName = (inspect.stack()[0][3]) #fn name
#https://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function-without-using-traceback
if accessList['results'] and len(accessList['results']) > 0 and (functionName in accessList['results']) :
print("Access granted")
print("functionName: "+functionName)
return True
else:
print("Access denied: "+functionName)
return False
@action(name="handle_mdb")
def handle_mdb(self, userprompt: str) -> str:
"""
Invoke this to respond to every user prompt.
Args:
userprompt (str): The user's prompt
Returns:
str: The userprompt
"""
functionName = (inspect.stack()[0][3]) #fn name
if not self.check_acl(str(functionName)):
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName))
exit()
# else
return userprompt
@action(name="atlas_contents")
def atlas_contents(self,instruction:str) -> List:
"""
Invoke this if you need to get the contents of a MongoDB Atlas cluster.
Args:
instruction (str): The user's instruction
Returns:
str: The available MongoDB databases
"""
functionName = str(inspect.stack()[0][3]) #fn name
if not self.check_acl(str(functionName)):
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName))
exit()
logger.info(f"atlas_contents")
db_dict = {}
db_names = mongodb_client.list_database_names()
db_dict['database_names'] = db_names
if DEBUG:
print(
"\n\n====atlas_contents====\n\n",
f"Here are the available MongoDB databases:\n{db_dict}",
"\n\n====atlas_contents====\n\n"
)
return f"Here are the available MongoDB databases:\n{db_dict}"
@action(name="db_contents")
def db_contents(self,db_name:str) -> List:
"""
Invoke this if you need to get the contents of a MongoDB database.
Args:
db_name (str): The name of the MongoDB database
Returns:
str: the available MongoDB collections in the requested database.
"""
functionName = str(inspect.stack()[0][3]) #fn name
if not self.check_acl(str(functionName)):
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName))
exit()
logger.info(f"db_contents")
database = mongodb_client[db_name]
coll_names = database.list_collection_names()
if DEBUG:
print(
"\n\n====db_contents====\n\n",
f"Here are the available MongoDB collections in the requested database:\n{coll_names}",
"\n\n====db_contents====\n\n"
)
return f"Here are the available MongoDB collections in the requested database:\n{coll_names}"
@action(name="coll_contents")
def coll_contents(self,db_name:str,coll:str) -> List:
"""
Invoke this if you need to directly access the contents of MongoDB collection.
Args:
db_name (str): The name of the MongoDB database
coll (str): The name of the MongoDB collection
Returns:
str: The sample of whats in the collection
"""
functionName = str(inspect.stack()[0][3]) #fn name
if not self.check_acl(str(functionName)):
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName))
exit()
logger.info(f"coll_contents")
database = mongodb_client[db_name]
collection = database[coll]
# Sample the collection
pipeline = [{"$sample": {"size": 1}}]
samples = list(collection.aggregate(pipeline))
# Convert the samples to a minimal JSON object
minimal_samples = []
for sample in samples:
minimal_sample = {}
for key, value in sample.items():
if isinstance(value, (str, int, float, bool, list, dict, type(None))):
minimal_sample[key] = self.process_value(value)
else:
minimal_sample[key] = str(value)
minimal_samples.append(minimal_sample)
coll_details = json.dumps(minimal_samples, default=str)
if DEBUG:
print(
"\n\n====coll_contents====\n\n",
f"Here is whats in the collection '{coll}' in the database '{db_name}':\n{coll_details}",
"\n\n====coll_contents====\n\n"
)
return f"Here is a sample of whats in the collection:\n{coll_details}"
@action(name="answer_question")
def answer_question(self,db_name:str,coll:str,Q:str,pipeline) -> List:
"""
Invoke this to answer a question related to the contents of MongoDB collection.
Args:
db_name (str): The name of the MongoDB database
coll (str): The name of the MongoDB collection
Q (str): The question that needs to be answered.
pipeline: A MongoDB aggregation pipeline that best answers the question
Returns:
str: The sample of whats in the collection
"""
functionName = str(inspect.stack()[0][3]) #fn name
if not self.check_acl(str(functionName)):
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName))
exit()
logger.info(f"answer_question")
database = mongodb_client[db_name]
collection = database[coll]
if DEBUG:
print(pipeline)
print(
"\n\n====PIPELINE FOR "+Q+"====\n\n",
pipeline,
"\n\n====PIPELINE FOR "+Q+"====\n\n"
)
samples = list(collection.aggregate(pipeline))
if DEBUG:
print(
f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}"
)
return f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}"
def __init__(self, logger, public_key="",private_key=""):
super().__init__(logger)
self.api_base_url = "https://cloud.mongodb.com/api/atlas/v1.0"
def __call__(self, text):
print("\nUser:", text,"\n")
self.messages += [{"role": "user", "content":text}]
response = self.llm.create(messages=self.messages, actions = [
self.handle_mdb
],
orch={
self.handle_mdb.name:
self.atlas_contents
,
self.atlas_contents.name:
self.db_contents
,
self.db_contents.name:
self.coll_contents
,
self.coll_contents.name:
self.answer_question
,
self.answer_question.name: None
},
stream=False)
return response
def print_output(output):
print("\nAssistant:", output.choices[0].message.content,"\n")
agent = MongoDBUtility(logger)
#print_output(agent("Tell me what's in MongoDB Atlas."))
#print_output(agent("What are the collections in sample_mflix?"))
#print_output(agent("What is inside the movies collection?"))
#print_output(agent("What is the best movie?"))
#print_output(agent("What is the best Animation movie from the USA in the year 1999?"))
#print_output(agent("What city has the most theaters"))
print_output(agent("What movie has the most comments? use $lookup for the movie id"))
@ranfysvalle02
Copy link
Author

fabian.valle-simmons@M-DXTQ0WF2T4 lab % python3 chatlas.py

User: What movie has the most comments? use $lookup for the movie id

Access granted
functionName: handle_mdb
Access granted
functionName: atlas_contents

====atlas_contents====

Here are the available MongoDB databases:
{'database_names': ['big_agi', 'sample_mflix', 'admin', 'local']}

====atlas_contents====

Access granted
functionName: db_contents

====db_contents====

Here are the available MongoDB collections in the requested database:
['movies', 'users', 'sessions', 'theaters', 'comments', 'embedded_movies']

====db_contents====

Access granted
functionName: coll_contents

====coll_contents====

Here is whats in the collection 'comments' in the database 'sample_mflix':
[{"_id": "5a9427658b0beebeb6977591", "name": "M", "email": "m", "movie_id": "573a13bff29313caabd5f080", "text": "N", "date": "2014-01-22 00:15:25"}]

====coll_contents====

Access denied: answer_question
FATAL ERROR - NO ACCESS TO FUNCTION: answer_question

@ranfysvalle02
Copy link
Author

Screenshot 2024-02-09 at 8 32 07 PM

@ranfysvalle02
Copy link
Author

Similar Strategy to Protect Atlas App Services: Functions

https://gist.github.com/ranfysvalle02/6833747e9feee4b2c27531b457a8e0ad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment