Created
February 19, 2025 09:01
-
-
Save SaschaHeyer/677d4db3dc8cc07bd7444191b5eee41a to your computer and use it in GitHub Desktop.
agent
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import vertexai | |
from vertexai.generative_models import ( | |
GenerativeModel, | |
Tool, | |
FunctionDeclaration, | |
) | |
from vertexai.preview import reasoning_engines | |
from google.cloud import bigquery, firestore | |
from langchain_google_firestore import FirestoreChatMessageHistory | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_core import prompts | |
from langchain.agents.format_scratchpad.tools import format_to_tool_messages | |
# Vertex AI Configuration | |
PROJECT_ID = "sascha-playground-doit" | |
LOCATION = "us-central1" | |
STAGING_BUCKET = "gs://doit-llm" | |
vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET) | |
# Initialize BigQuery Client | |
bq_client = bigquery.Client() | |
# 🔹 Firestore Chat History for Context | |
def get_session_history(session_id: str): | |
client = firestore.Client(project=PROJECT_ID) | |
return FirestoreChatMessageHistory( | |
client=client, | |
session_id=session_id, | |
collection="history", | |
encode_message=False, | |
) | |
# 🔹 Function: List Datasets | |
def list_datasets_func() -> dict: | |
""" | |
Get a list of datasets that will help answer the user's question | |
""" | |
datasets = [dataset.dataset_id for dataset in bq_client.list_datasets()] | |
return {"datasets": datasets} | |
# 🔹 Function: List Tables in a Dataset | |
def list_tables_func(dataset_id: str) -> dict: | |
"""List tables in a dataset that will help answer the user's question | |
Args: | |
dataset_id (str): Dataset ID to fetch tables from. | |
""" | |
tables = [table.table_id for table in bq_client.list_tables(dataset_id)] | |
return {"tables": tables} | |
# 🔹 Function: Get Table Schema & Metadata | |
def get_table_func(table_id: str) -> dict: | |
"""Get information about a table, including the description, schema, and number of rows that will help answer the user's question. Always use the fully qualified dataset and table names. | |
Args: | |
table_id (str): Fully qualified ID of the table to get information about | |
""" | |
table = bq_client.get_table(table_id) | |
return { | |
"description": table.description or "No description available", | |
"schema": [field.name for field in table.schema], | |
"num_rows": table.num_rows, | |
} | |
# 🔹 Function: Execute SQL Query | |
def sql_query_func(query: str) -> dict: | |
"""Get information from data in BigQuery using SQL queries | |
Args: | |
query (str):SQL query on a single line that will help give quantitative answers to the user's question when run on a BigQuery dataset and table. In the SQL query, always use the fully qualified dataset and table names.. | |
""" | |
job_config = bigquery.QueryJobConfig(maximum_bytes_billed=100000000) # 100MB limit | |
try: | |
query_job = bq_client.query(query, job_config=job_config) | |
results = [dict(row) for row in query_job.result()] | |
return {"results": results} | |
except Exception as e: | |
return {"error": str(e)} | |
# 🔹 System Prompt for Agent | |
system_prompt = """ | |
Please give a concise, high-level summary followed by detail in | |
plain language about where the information in your response is | |
coming from in the database. Only use information that you learn | |
from BigQuery, do not make up information. | |
""" | |
# 🔹 Define Chat Prompt Template | |
chat_prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", system_prompt), | |
#MessagesPlaceholder(variable_name="history"), | |
("user", "{input}"), | |
] | |
) | |
custom_prompt_template = { | |
"user_input": lambda x: x["input"], | |
"history": lambda x: x["history"], | |
"agent_scratchpad": lambda x: format_to_tool_messages(x["intermediate_steps"]), | |
} | ChatPromptTemplate.from_messages([ | |
("placeholder", "{history}"), | |
("user", "{user_input}"), | |
("placeholder", "{agent_scratchpad}"), | |
]) | |
# 🔹 Define Agent with Reasoning Engine | |
agent = reasoning_engines.LangchainAgent( | |
#prompt=chat_prompt, | |
#system_instruction=system_prompt, | |
#prompt=custom_prompt_template, | |
model_kwargs={"temperature": 0}, | |
model="gemini-1.5-pro", | |
#chat_history=get_session_history, | |
tools=[list_datasets_func, | |
list_tables_func, | |
get_table_func, | |
sql_query_func], | |
agent_executor_kwargs={"return_intermediate_steps": True}, | |
) | |
# 🔹 Deploy Agent as a Remote Reasoning Engine | |
#remote_agent = reasoning_engines.ReasoningEngine.create( | |
# agent, | |
# requirements=[ | |
# "google-cloud-aiplatform[langchain,reasoningengine]", | |
# "langchain-google-firestore", | |
# ], | |
#) | |
remote_agent = agent | |
#response = remote_agent.query( | |
# input="What datasets are available in BigQuery?", | |
#) | |
#print(response["output"]) | |
#response = remote_agent.query( | |
# input="What tables exist in the dataset 'thelook_ecommerce'?", | |
#) | |
#print(response["output"]) | |
#response = remote_agent.query( | |
# input="What are the columns in 'thelook_ecommerce.orders'?", | |
#) | |
#print(response["output"]) | |
response = remote_agent.query( | |
input="Which product categories have the highest profit margins? Calculate it" | |
) | |
#print(response) | |
print(response["output"]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pylint: disable=broad-exception-caught,invalid-name | |
import time | |
from google import genai | |
from google.cloud import bigquery | |
from google.genai.types import FunctionDeclaration, GenerateContentConfig, Part, Tool | |
import streamlit as st | |
#BIGQUERY_DATASET_ID = "thelook_ecommerce" | |
MODEL_ID = "gemini-1.5-pro" | |
LOCATION = "us-central1" | |
list_datasets_func = FunctionDeclaration( | |
name="list_datasets", | |
description="Get a list of datasets that will help answer the user's question", | |
parameters={ | |
"type": "object", | |
"properties": {}, | |
}, | |
) | |
list_tables_func = FunctionDeclaration( | |
name="list_tables", | |
description="List tables in a dataset that will help answer the user's question", | |
parameters={ | |
"type": "object", | |
"properties": { | |
"dataset_id": { | |
"type": "string", | |
"description": "Dataset ID to fetch tables from.", | |
} | |
}, | |
"required": [ | |
"dataset_id", | |
], | |
}, | |
) | |
get_table_func = FunctionDeclaration( | |
name="get_table", | |
description="Get information about a table, including the description, schema, and number of rows that will help answer the user's question. Always use the fully qualified dataset and table names.", | |
parameters={ | |
"type": "object", | |
"properties": { | |
"table_id": { | |
"type": "string", | |
"description": "Fully qualified ID of the table to get information about", | |
} | |
}, | |
"required": [ | |
"table_id", | |
], | |
}, | |
) | |
sql_query_func = FunctionDeclaration( | |
name="sql_query", | |
description="Get information from data in BigQuery using SQL queries", | |
parameters={ | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "SQL query on a single line that will help give quantitative answers to the user's question when run on a BigQuery dataset and table. In the SQL query, always use the fully qualified dataset and table names.", | |
} | |
}, | |
"required": [ | |
"query", | |
], | |
}, | |
) | |
sql_query_tool = Tool( | |
function_declarations=[ | |
list_datasets_func, | |
list_tables_func, | |
get_table_func, | |
sql_query_func, | |
], | |
) | |
client = genai.Client(vertexai=True, location=LOCATION) | |
st.set_page_config( | |
page_title="BigQuery Analyst Agent", | |
layout="wide", | |
) | |
col1, col2 = st.columns([8, 1]) | |
with col1: | |
st.title("BigQuery Analyst Agent") | |
with st.expander("Sample prompts", expanded=True): | |
st.write( | |
""" | |
- What kind of information is in this database? | |
- What percentage of orders are returned? | |
- How is inventory distributed across our regional distribution centers? | |
- Do customers typically place more than one order? | |
- Which product categories have the highest profit margins? | |
""" | |
) | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"].replace("$", r"\$")) # noqa: W605 | |
try: | |
with st.expander("Function calls, parameters, and responses"): | |
st.markdown(message["backend_details"]) | |
except KeyError: | |
pass | |
if prompt := st.chat_input("Ask me about information in the database..."): | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
with st.chat_message("assistant"): | |
message_placeholder = st.empty() | |
full_response = "" | |
chat = client.chats.create( | |
model=MODEL_ID, | |
config=GenerateContentConfig(temperature=0, tools=[sql_query_tool]), | |
) | |
client = bigquery.Client() | |
prompt += """ | |
Please give a concise, high-level summary followed by detail in | |
plain language about where the information in your response is | |
coming from in the database. Only use information that you learn | |
from BigQuery, do not make up information. | |
""" | |
try: | |
response = chat.send_message(prompt) | |
response = response.candidates[0].content.parts[0] | |
print(response) | |
api_requests_and_responses = [] | |
backend_details = "" | |
function_calling_in_process = True | |
while function_calling_in_process: | |
try: | |
params = {} | |
for key, value in response.function_call.args.items(): | |
params[key] = value | |
print(response.function_call.name) | |
print(params) | |
if response.function_call.name == "list_datasets": | |
api_response = client.list_datasets() | |
api_response = [dataset.dataset_id for dataset in client.list_datasets()] | |
#api_response = BIGQUERY_DATASET_ID | |
api_requests_and_responses.append( | |
[response.function_call.name, params, api_response] | |
) | |
if response.function_call.name == "list_tables": | |
api_response = client.list_tables(params["dataset_id"]) | |
api_response = str([table.table_id for table in api_response]) | |
api_requests_and_responses.append( | |
[response.function_call.name, params, api_response] | |
) | |
if response.function_call.name == "get_table": | |
api_response = client.get_table(params["table_id"]) | |
api_response = api_response.to_api_repr() | |
api_requests_and_responses.append( | |
[ | |
response.function_call.name, | |
params, | |
[ | |
str(api_response.get("description", "")), | |
str( | |
[ | |
column["name"] | |
for column in api_response["schema"][ | |
"fields" | |
] | |
] | |
), | |
], | |
] | |
) | |
api_response = str(api_response) | |
if response.function_call.name == "sql_query": | |
job_config = bigquery.QueryJobConfig( | |
maximum_bytes_billed=100000000 | |
) # Data limit per query job | |
try: | |
cleaned_query = ( | |
params["query"] | |
.replace("\\n", " ") | |
.replace("\n", "") | |
.replace("\\", "") | |
) | |
query_job = client.query( | |
cleaned_query, job_config=job_config | |
) | |
api_response = query_job.result() | |
api_response = str([dict(row) for row in api_response]) | |
api_response = api_response.replace("\\", "").replace( | |
"\n", "" | |
) | |
api_requests_and_responses.append( | |
[response.function_call.name, params, api_response] | |
) | |
except Exception as e: | |
error_message = f""" | |
We're having trouble running this SQL query. This | |
could be due to an invalid query or the structure of | |
the data. Try rephrasing your question to help the | |
model generate a valid query. Details: | |
{str(e)}""" | |
st.error(error_message) | |
api_response = error_message | |
api_requests_and_responses.append( | |
[response.function_call.name, params, api_response] | |
) | |
st.session_state.messages.append( | |
{ | |
"role": "assistant", | |
"content": error_message, | |
} | |
) | |
print(api_response) | |
response = chat.send_message( | |
Part.from_function_response( | |
name=response.function_call.name, | |
response={ | |
"content": api_response, | |
}, | |
), | |
) | |
response = response.candidates[0].content.parts[0] | |
backend_details += "- Function call:\n" | |
backend_details += ( | |
" - Function name: ```" | |
+ str(api_requests_and_responses[-1][0]) | |
+ "```" | |
) | |
backend_details += "\n\n" | |
backend_details += ( | |
" - Function parameters: ```" | |
+ str(api_requests_and_responses[-1][1]) | |
+ "```" | |
) | |
backend_details += "\n\n" | |
backend_details += ( | |
" - API response: ```" | |
+ str(api_requests_and_responses[-1][2]) | |
+ "```" | |
) | |
backend_details += "\n\n" | |
with message_placeholder.container(): | |
st.markdown(backend_details) | |
except AttributeError: | |
function_calling_in_process = False | |
time.sleep(3) | |
full_response = response.text | |
with message_placeholder.container(): | |
st.markdown(full_response.replace("$", r"\$")) # noqa: W605 | |
with st.expander("Function calls, parameters, and responses:"): | |
st.markdown(backend_details) | |
st.session_state.messages.append( | |
{ | |
"role": "assistant", | |
"content": full_response, | |
"backend_details": backend_details, | |
} | |
) | |
except Exception as e: | |
print(e) | |
error_message = f""" | |
Something went wrong! We encountered an unexpected error while | |
trying to process your request. Please try rephrasing your | |
question. Details: | |
{str(e)}""" | |
st.error(error_message) | |
st.session_state.messages.append( | |
{ | |
"role": "assistant", | |
"content": error_message, | |
} | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment