Created
February 10, 2024 01:38
-
-
Save ranfysvalle02/d3c29ba27f2f6cf3dcee7bf5907a86e3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import openai | |
import itertools | |
import inspect | |
from actionweaver import action | |
from typing import List | |
from pymongo import MongoClient | |
import json | |
import inspect | |
import requests | |
OSO_authToken = "" | |
DEMO_USER = "[email protected]" | |
def checkOSO(): | |
apiUrl = 'https://cloud.osohq.com/api/list' | |
requestData = { | |
'actor_type': 'User', | |
'actor_id': DEMO_USER, | |
'action': 'view', | |
'resource_type': 'Function', | |
'context_facts': [] | |
} | |
try: | |
response = requests.post(apiUrl, json=requestData, headers={ | |
'Authorization': f'Bearer {OSO_authToken}', | |
'Accept': 'application/json', | |
'Content-Type': 'application/json' | |
}) | |
return response.json() | |
except Exception as error: | |
print('Error:', error) | |
return error | |
accessList = checkOSO() | |
connection_string="" | |
mongodb_client = MongoClient(connection_string) | |
DEBUG=True | |
from actionweaver.llms.azure.chat import ChatCompletion | |
from actionweaver.utils.tokens import TokenUsageTracker | |
from actionweaver import action | |
logging.basicConfig( | |
filename='agent.log', | |
filemode='a', | |
format='%(asctime)s.%(msecs)04d %(levelname)s {%(module)s} [%(funcName)s] %(message)s', | |
level=logging.INFO, | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.DEBUG) | |
class AzureAgent: | |
def __init__(self, logger): | |
self.logger = logger | |
self.token_tracker = TokenUsageTracker(budget=None) | |
self.token_budget = 2000 | |
self.llm = ChatCompletion( | |
model="gpt-4", azure_deployment="gpt-4", | |
azure_endpoint="", api_key="", | |
api_version="2023-10-01-preview", | |
token_usage_tracker = TokenUsageTracker(budget=self.token_budget), | |
logger=logger) | |
self.messages = [ | |
{"role": "system", "content": "You are a resourceful AI assistant, Your specialization is answering questions about MongoDB Atlas environments."}, | |
{"role": "system", "content": "Think critically and step by step. You already have access to the MongoDB Atlas environment, and have the tools at your disposal to answer the question."}, | |
{"role":"system", "content":" [IMPORTANT: NEVER ANSWER DIRECTLY! ALWAYS USE YOUR AVAILABLE TOOLS!]"}, | |
{"role":"assistant", "content":" [IMPORTANT: I WILL NEVER ANSWER DIRECTLY! I WILL ALWAYS USE MY AVAILABLE TOOLS!]"}, | |
] | |
self.times = [] | |
class MongoDBUtility(AzureAgent): | |
def truncate_string(self,s, max_length=1): | |
return s[:max_length] | |
def round_float(self,f, decimal_places=1): | |
return round(f, decimal_places) | |
def limit_list(self,l, max_length=1): | |
return l[:max_length] | |
def limit_dict(self,d, max_keys=5): | |
return {k: d[k] for k in list(d.keys())[:max_keys]} | |
def process_value(self,value): | |
if isinstance(value, str): | |
return self.truncate_string(value) | |
elif isinstance(value, float): | |
return self.round_float(value) | |
elif isinstance(value, list): | |
return self.limit_list(value) | |
elif isinstance(value, dict): | |
return self.limit_dict(value) | |
else: | |
return value | |
def check_acl(self,functionName): | |
#functionName = (inspect.stack()[0][3]) #fn name | |
#https://stackoverflow.com/questions/5067604/determine-function-name-from-within-that-function-without-using-traceback | |
if accessList['results'] and len(accessList['results']) > 0 and (functionName in accessList['results']) : | |
print("Access granted") | |
print("functionName: "+functionName) | |
return True | |
else: | |
print("Access denied: "+functionName) | |
return False | |
@action(name="handle_mdb") | |
def handle_mdb(self, userprompt: str) -> str: | |
""" | |
Invoke this to respond to every user prompt. | |
Args: | |
userprompt (str): The user's prompt | |
Returns: | |
str: The userprompt | |
""" | |
functionName = (inspect.stack()[0][3]) #fn name | |
if not self.check_acl(str(functionName)): | |
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName)) | |
exit() | |
# else | |
return userprompt | |
@action(name="atlas_contents") | |
def atlas_contents(self,instruction:str) -> List: | |
""" | |
Invoke this if you need to get the contents of a MongoDB Atlas cluster. | |
Args: | |
instruction (str): The user's instruction | |
Returns: | |
str: The available MongoDB databases | |
""" | |
functionName = str(inspect.stack()[0][3]) #fn name | |
if not self.check_acl(str(functionName)): | |
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName)) | |
exit() | |
logger.info(f"atlas_contents") | |
db_dict = {} | |
db_names = mongodb_client.list_database_names() | |
db_dict['database_names'] = db_names | |
if DEBUG: | |
print( | |
"\n\n====atlas_contents====\n\n", | |
f"Here are the available MongoDB databases:\n{db_dict}", | |
"\n\n====atlas_contents====\n\n" | |
) | |
return f"Here are the available MongoDB databases:\n{db_dict}" | |
@action(name="db_contents") | |
def db_contents(self,db_name:str) -> List: | |
""" | |
Invoke this if you need to get the contents of a MongoDB database. | |
Args: | |
db_name (str): The name of the MongoDB database | |
Returns: | |
str: the available MongoDB collections in the requested database. | |
""" | |
functionName = str(inspect.stack()[0][3]) #fn name | |
if not self.check_acl(str(functionName)): | |
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName)) | |
exit() | |
logger.info(f"db_contents") | |
database = mongodb_client[db_name] | |
coll_names = database.list_collection_names() | |
if DEBUG: | |
print( | |
"\n\n====db_contents====\n\n", | |
f"Here are the available MongoDB collections in the requested database:\n{coll_names}", | |
"\n\n====db_contents====\n\n" | |
) | |
return f"Here are the available MongoDB collections in the requested database:\n{coll_names}" | |
@action(name="coll_contents") | |
def coll_contents(self,db_name:str,coll:str) -> List: | |
""" | |
Invoke this if you need to directly access the contents of MongoDB collection. | |
Args: | |
db_name (str): The name of the MongoDB database | |
coll (str): The name of the MongoDB collection | |
Returns: | |
str: The sample of whats in the collection | |
""" | |
functionName = str(inspect.stack()[0][3]) #fn name | |
if not self.check_acl(str(functionName)): | |
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName)) | |
exit() | |
logger.info(f"coll_contents") | |
database = mongodb_client[db_name] | |
collection = database[coll] | |
# Sample the collection | |
pipeline = [{"$sample": {"size": 1}}] | |
samples = list(collection.aggregate(pipeline)) | |
# Convert the samples to a minimal JSON object | |
minimal_samples = [] | |
for sample in samples: | |
minimal_sample = {} | |
for key, value in sample.items(): | |
if isinstance(value, (str, int, float, bool, list, dict, type(None))): | |
minimal_sample[key] = self.process_value(value) | |
else: | |
minimal_sample[key] = str(value) | |
minimal_samples.append(minimal_sample) | |
coll_details = json.dumps(minimal_samples, default=str) | |
if DEBUG: | |
print( | |
"\n\n====coll_contents====\n\n", | |
f"Here is whats in the collection '{coll}' in the database '{db_name}':\n{coll_details}", | |
"\n\n====coll_contents====\n\n" | |
) | |
return f"Here is a sample of whats in the collection:\n{coll_details}" | |
@action(name="answer_question") | |
def answer_question(self,db_name:str,coll:str,Q:str,pipeline) -> List: | |
""" | |
Invoke this to answer a question related to the contents of MongoDB collection. | |
Args: | |
db_name (str): The name of the MongoDB database | |
coll (str): The name of the MongoDB collection | |
Q (str): The question that needs to be answered. | |
pipeline: A MongoDB aggregation pipeline that best answers the question | |
Returns: | |
str: The sample of whats in the collection | |
""" | |
functionName = str(inspect.stack()[0][3]) #fn name | |
if not self.check_acl(str(functionName)): | |
print("FATAL ERROR - NO ACCESS TO FUNCTION: "+str(functionName)) | |
exit() | |
logger.info(f"answer_question") | |
database = mongodb_client[db_name] | |
collection = database[coll] | |
if DEBUG: | |
print(pipeline) | |
print( | |
"\n\n====PIPELINE FOR "+Q+"====\n\n", | |
pipeline, | |
"\n\n====PIPELINE FOR "+Q+"====\n\n" | |
) | |
samples = list(collection.aggregate(pipeline)) | |
if DEBUG: | |
print( | |
f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}" | |
) | |
return f"Here is the database result to support answering the question '{Q}'\nMongoDB Results:{samples}" | |
def __init__(self, logger, public_key="",private_key=""): | |
super().__init__(logger) | |
self.api_base_url = "https://cloud.mongodb.com/api/atlas/v1.0" | |
def __call__(self, text): | |
print("\nUser:", text,"\n") | |
self.messages += [{"role": "user", "content":text}] | |
response = self.llm.create(messages=self.messages, actions = [ | |
self.handle_mdb | |
], | |
orch={ | |
self.handle_mdb.name: | |
self.atlas_contents | |
, | |
self.atlas_contents.name: | |
self.db_contents | |
, | |
self.db_contents.name: | |
self.coll_contents | |
, | |
self.coll_contents.name: | |
self.answer_question | |
, | |
self.answer_question.name: None | |
}, | |
stream=False) | |
return response | |
def print_output(output): | |
print("\nAssistant:", output.choices[0].message.content,"\n") | |
agent = MongoDBUtility(logger) | |
#print_output(agent("Tell me what's in MongoDB Atlas.")) | |
#print_output(agent("What are the collections in sample_mflix?")) | |
#print_output(agent("What is inside the movies collection?")) | |
#print_output(agent("What is the best movie?")) | |
#print_output(agent("What is the best Animation movie from the USA in the year 1999?")) | |
#print_output(agent("What city has the most theaters")) | |
print_output(agent("What movie has the most comments? use $lookup for the movie id")) |
Similar Strategy to Protect Atlas App Services: Functions
https://gist.github.com/ranfysvalle02/6833747e9feee4b2c27531b457a8e0ad
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
fabian.valle-simmons@M-DXTQ0WF2T4 lab % python3 chatlas.py
User: What movie has the most comments? use $lookup for the movie id
Access granted
functionName: handle_mdb
Access granted
functionName: atlas_contents
====atlas_contents====
Here are the available MongoDB databases:
{'database_names': ['big_agi', 'sample_mflix', 'admin', 'local']}
====atlas_contents====
Access granted
functionName: db_contents
====db_contents====
Here are the available MongoDB collections in the requested database:
['movies', 'users', 'sessions', 'theaters', 'comments', 'embedded_movies']
====db_contents====
Access granted
functionName: coll_contents
====coll_contents====
Here is whats in the collection 'comments' in the database 'sample_mflix':
[{"_id": "5a9427658b0beebeb6977591", "name": "M", "email": "m", "movie_id": "573a13bff29313caabd5f080", "text": "N", "date": "2014-01-22 00:15:25"}]
====coll_contents====
Access denied: answer_question
FATAL ERROR - NO ACCESS TO FUNCTION: answer_question