Skip to content

Instantly share code, notes, and snippets.

@SaschaHeyer
Created February 19, 2025 09:01
Show Gist options
  • Save SaschaHeyer/677d4db3dc8cc07bd7444191b5eee41a to your computer and use it in GitHub Desktop.
Save SaschaHeyer/677d4db3dc8cc07bd7444191b5eee41a to your computer and use it in GitHub Desktop.
agent
import vertexai
from vertexai.generative_models import (
GenerativeModel,
Tool,
FunctionDeclaration,
)
from vertexai.preview import reasoning_engines
from google.cloud import bigquery, firestore
from langchain_google_firestore import FirestoreChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core import prompts
from langchain.agents.format_scratchpad.tools import format_to_tool_messages
# Vertex AI Configuration
PROJECT_ID = "sascha-playground-doit"
LOCATION = "us-central1"
STAGING_BUCKET = "gs://doit-llm"
vertexai.init(project=PROJECT_ID, location=LOCATION, staging_bucket=STAGING_BUCKET)
# Initialize BigQuery Client
bq_client = bigquery.Client()
# 🔹 Firestore Chat History for Context
def get_session_history(session_id: str):
client = firestore.Client(project=PROJECT_ID)
return FirestoreChatMessageHistory(
client=client,
session_id=session_id,
collection="history",
encode_message=False,
)
# 🔹 Function: List Datasets
def list_datasets_func() -> dict:
"""
Get a list of datasets that will help answer the user's question
"""
datasets = [dataset.dataset_id for dataset in bq_client.list_datasets()]
return {"datasets": datasets}
# 🔹 Function: List Tables in a Dataset
def list_tables_func(dataset_id: str) -> dict:
"""List tables in a dataset that will help answer the user's question
Args:
dataset_id (str): Dataset ID to fetch tables from.
"""
tables = [table.table_id for table in bq_client.list_tables(dataset_id)]
return {"tables": tables}
# 🔹 Function: Get Table Schema & Metadata
def get_table_func(table_id: str) -> dict:
"""Get information about a table, including the description, schema, and number of rows that will help answer the user's question. Always use the fully qualified dataset and table names.
Args:
table_id (str): Fully qualified ID of the table to get information about
"""
table = bq_client.get_table(table_id)
return {
"description": table.description or "No description available",
"schema": [field.name for field in table.schema],
"num_rows": table.num_rows,
}
# 🔹 Function: Execute SQL Query
def sql_query_func(query: str) -> dict:
"""Get information from data in BigQuery using SQL queries
Args:
query (str):SQL query on a single line that will help give quantitative answers to the user's question when run on a BigQuery dataset and table. In the SQL query, always use the fully qualified dataset and table names..
"""
job_config = bigquery.QueryJobConfig(maximum_bytes_billed=100000000) # 100MB limit
try:
query_job = bq_client.query(query, job_config=job_config)
results = [dict(row) for row in query_job.result()]
return {"results": results}
except Exception as e:
return {"error": str(e)}
# 🔹 System Prompt for Agent
system_prompt = """
Please give a concise, high-level summary followed by detail in
plain language about where the information in your response is
coming from in the database. Only use information that you learn
from BigQuery, do not make up information.
"""
# 🔹 Define Chat Prompt Template
chat_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
#MessagesPlaceholder(variable_name="history"),
("user", "{input}"),
]
)
custom_prompt_template = {
"user_input": lambda x: x["input"],
"history": lambda x: x["history"],
"agent_scratchpad": lambda x: format_to_tool_messages(x["intermediate_steps"]),
} | ChatPromptTemplate.from_messages([
("placeholder", "{history}"),
("user", "{user_input}"),
("placeholder", "{agent_scratchpad}"),
])
# 🔹 Define Agent with Reasoning Engine
agent = reasoning_engines.LangchainAgent(
#prompt=chat_prompt,
#system_instruction=system_prompt,
#prompt=custom_prompt_template,
model_kwargs={"temperature": 0},
model="gemini-1.5-pro",
#chat_history=get_session_history,
tools=[list_datasets_func,
list_tables_func,
get_table_func,
sql_query_func],
agent_executor_kwargs={"return_intermediate_steps": True},
)
# 🔹 Deploy Agent as a Remote Reasoning Engine
#remote_agent = reasoning_engines.ReasoningEngine.create(
# agent,
# requirements=[
# "google-cloud-aiplatform[langchain,reasoningengine]",
# "langchain-google-firestore",
# ],
#)
remote_agent = agent
#response = remote_agent.query(
# input="What datasets are available in BigQuery?",
#)
#print(response["output"])
#response = remote_agent.query(
# input="What tables exist in the dataset 'thelook_ecommerce'?",
#)
#print(response["output"])
#response = remote_agent.query(
# input="What are the columns in 'thelook_ecommerce.orders'?",
#)
#print(response["output"])
response = remote_agent.query(
input="Which product categories have the highest profit margins? Calculate it"
)
#print(response)
print(response["output"])
# pylint: disable=broad-exception-caught,invalid-name
import time
from google import genai
from google.cloud import bigquery
from google.genai.types import FunctionDeclaration, GenerateContentConfig, Part, Tool
import streamlit as st
#BIGQUERY_DATASET_ID = "thelook_ecommerce"
MODEL_ID = "gemini-1.5-pro"
LOCATION = "us-central1"
list_datasets_func = FunctionDeclaration(
name="list_datasets",
description="Get a list of datasets that will help answer the user's question",
parameters={
"type": "object",
"properties": {},
},
)
list_tables_func = FunctionDeclaration(
name="list_tables",
description="List tables in a dataset that will help answer the user's question",
parameters={
"type": "object",
"properties": {
"dataset_id": {
"type": "string",
"description": "Dataset ID to fetch tables from.",
}
},
"required": [
"dataset_id",
],
},
)
get_table_func = FunctionDeclaration(
name="get_table",
description="Get information about a table, including the description, schema, and number of rows that will help answer the user's question. Always use the fully qualified dataset and table names.",
parameters={
"type": "object",
"properties": {
"table_id": {
"type": "string",
"description": "Fully qualified ID of the table to get information about",
}
},
"required": [
"table_id",
],
},
)
sql_query_func = FunctionDeclaration(
name="sql_query",
description="Get information from data in BigQuery using SQL queries",
parameters={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query on a single line that will help give quantitative answers to the user's question when run on a BigQuery dataset and table. In the SQL query, always use the fully qualified dataset and table names.",
}
},
"required": [
"query",
],
},
)
sql_query_tool = Tool(
function_declarations=[
list_datasets_func,
list_tables_func,
get_table_func,
sql_query_func,
],
)
client = genai.Client(vertexai=True, location=LOCATION)
st.set_page_config(
page_title="BigQuery Analyst Agent",
layout="wide",
)
col1, col2 = st.columns([8, 1])
with col1:
st.title("BigQuery Analyst Agent")
with st.expander("Sample prompts", expanded=True):
st.write(
"""
- What kind of information is in this database?
- What percentage of orders are returned?
- How is inventory distributed across our regional distribution centers?
- Do customers typically place more than one order?
- Which product categories have the highest profit margins?
"""
)
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"].replace("$", r"\$")) # noqa: W605
try:
with st.expander("Function calls, parameters, and responses"):
st.markdown(message["backend_details"])
except KeyError:
pass
if prompt := st.chat_input("Ask me about information in the database..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
chat = client.chats.create(
model=MODEL_ID,
config=GenerateContentConfig(temperature=0, tools=[sql_query_tool]),
)
client = bigquery.Client()
prompt += """
Please give a concise, high-level summary followed by detail in
plain language about where the information in your response is
coming from in the database. Only use information that you learn
from BigQuery, do not make up information.
"""
try:
response = chat.send_message(prompt)
response = response.candidates[0].content.parts[0]
print(response)
api_requests_and_responses = []
backend_details = ""
function_calling_in_process = True
while function_calling_in_process:
try:
params = {}
for key, value in response.function_call.args.items():
params[key] = value
print(response.function_call.name)
print(params)
if response.function_call.name == "list_datasets":
api_response = client.list_datasets()
api_response = [dataset.dataset_id for dataset in client.list_datasets()]
#api_response = BIGQUERY_DATASET_ID
api_requests_and_responses.append(
[response.function_call.name, params, api_response]
)
if response.function_call.name == "list_tables":
api_response = client.list_tables(params["dataset_id"])
api_response = str([table.table_id for table in api_response])
api_requests_and_responses.append(
[response.function_call.name, params, api_response]
)
if response.function_call.name == "get_table":
api_response = client.get_table(params["table_id"])
api_response = api_response.to_api_repr()
api_requests_and_responses.append(
[
response.function_call.name,
params,
[
str(api_response.get("description", "")),
str(
[
column["name"]
for column in api_response["schema"][
"fields"
]
]
),
],
]
)
api_response = str(api_response)
if response.function_call.name == "sql_query":
job_config = bigquery.QueryJobConfig(
maximum_bytes_billed=100000000
) # Data limit per query job
try:
cleaned_query = (
params["query"]
.replace("\\n", " ")
.replace("\n", "")
.replace("\\", "")
)
query_job = client.query(
cleaned_query, job_config=job_config
)
api_response = query_job.result()
api_response = str([dict(row) for row in api_response])
api_response = api_response.replace("\\", "").replace(
"\n", ""
)
api_requests_and_responses.append(
[response.function_call.name, params, api_response]
)
except Exception as e:
error_message = f"""
We're having trouble running this SQL query. This
could be due to an invalid query or the structure of
the data. Try rephrasing your question to help the
model generate a valid query. Details:
{str(e)}"""
st.error(error_message)
api_response = error_message
api_requests_and_responses.append(
[response.function_call.name, params, api_response]
)
st.session_state.messages.append(
{
"role": "assistant",
"content": error_message,
}
)
print(api_response)
response = chat.send_message(
Part.from_function_response(
name=response.function_call.name,
response={
"content": api_response,
},
),
)
response = response.candidates[0].content.parts[0]
backend_details += "- Function call:\n"
backend_details += (
" - Function name: ```"
+ str(api_requests_and_responses[-1][0])
+ "```"
)
backend_details += "\n\n"
backend_details += (
" - Function parameters: ```"
+ str(api_requests_and_responses[-1][1])
+ "```"
)
backend_details += "\n\n"
backend_details += (
" - API response: ```"
+ str(api_requests_and_responses[-1][2])
+ "```"
)
backend_details += "\n\n"
with message_placeholder.container():
st.markdown(backend_details)
except AttributeError:
function_calling_in_process = False
time.sleep(3)
full_response = response.text
with message_placeholder.container():
st.markdown(full_response.replace("$", r"\$")) # noqa: W605
with st.expander("Function calls, parameters, and responses:"):
st.markdown(backend_details)
st.session_state.messages.append(
{
"role": "assistant",
"content": full_response,
"backend_details": backend_details,
}
)
except Exception as e:
print(e)
error_message = f"""
Something went wrong! We encountered an unexpected error while
trying to process your request. Please try rephrasing your
question. Details:
{str(e)}"""
st.error(error_message)
st.session_state.messages.append(
{
"role": "assistant",
"content": error_message,
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment