Created
July 29, 2024 20:22
-
-
Save shivendrasoni/f6e5075a739bdb85c7f29728319420d4 to your computer and use it in GitHub Desktop.
A Langgraph Agent to optimise SQL Lite database queries by observing a query log.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### | |
# It does not implement or force how you want to generate the logs | |
# It Assumes that the query logs are in a CSV file of format: | |
# timestamp,query,execution_time | |
# 2023-07-30 14:30:15,SELECT * FROM users WHERE age > 30,0.123456 | |
# 2023-07-30 14:30:16,INSERT INTO orders (user_id, product_id, quantity) VALUES (1, 5, 2),0.054321 | |
### | |
import os | |
from typing import Dict, List | |
from langgraph.graph import Graph, StateGraph | |
from langgraph.prebuilt.tool_nodes import ToolNode | |
from langchain_openai import ChatOpenAI | |
from langchain.tools import BaseTool | |
from langchain.agents import AgentExecutor, create_openai_functions_agent | |
from langchain_core.prompts import ChatPromptTemplate | |
# Assuming you have set your OpenAI API key in the environment variables | |
os.environ["OPENAI_API_KEY"] = "your-api-key-here" | |
# Tool for rewriting queries | |
class QueryRewriteTool(BaseTool): | |
name = "query_rewrite" | |
description = "Rewrites a SQL query to optimize its performance" | |
def _run(self, query: str) -> str: | |
# In a real scenario, this would use more sophisticated logic to rewrite the query | |
optimized_query = f"/* Optimized */ {query}" | |
with open("optimized_queries.sql", "a") as f: | |
f.write(f"{optimized_query}\n") | |
return f"Query optimized and saved: {optimized_query}" | |
# Tool for creating indexes | |
class CreateIndexTool(BaseTool): | |
name = "create_index" | |
description = "Suggests or creates an index for a given table and column" | |
def __init__(self, auto_create: bool = False): | |
self.auto_create = auto_create | |
super().__init__() | |
def _run(self, table: str, column: str) -> str: | |
index_suggestion = f"CREATE INDEX idx_{table}_{column} ON {table}({column});" | |
if self.auto_create: | |
# In a real scenario, this would actually create the index in the database | |
return f"Index created: {index_suggestion}" | |
else: | |
with open("index_suggestions.sql", "a") as f: | |
f.write(f"{index_suggestion}\n") | |
return f"Index suggestion saved: {index_suggestion}" | |
# Function to parse the query log | |
def parse_query_log(log_file: str) -> List[Dict[str, str]]: | |
queries = [] | |
with open(log_file, 'r') as f: | |
csv_reader = csv.reader(f) | |
next(csv_reader) # Skip header row | |
for row in csv_reader: | |
timestamp, query, execution_time = row | |
queries.append({ | |
"timestamp": timestamp, | |
"query": query, | |
"execution_time": float(execution_time) | |
}) | |
return queries | |
# Main optimizer function | |
def optimize_queries(state): | |
log_data = parse_query_log("query_log.csv") | |
llm = ChatOpenAI(model="gpt-3.5-turbo") | |
tools = [ | |
QueryRewriteTool(), | |
CreateIndexTool(auto_create=False) # Set to True if you want to auto-create indexes | |
] | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", "You are a SQL query optimizer. Analyze the given query and its execution time, then decide whether to rewrite the query or suggest an index creation."), | |
("human", "Timestamp: {timestamp}\nQuery: {query}\nExecution Time: {execution_time}\nWhat optimization would you suggest?") | |
]) | |
agent = create_openai_functions_agent(llm, tools, prompt) | |
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) | |
optimizations = [] | |
for query_info in log_data: | |
result = agent_executor.invoke(query_info) | |
optimizations.append(result) | |
state["optimizations"] = optimizations | |
return state | |
# Create the LangGraph | |
workflow = StateGraph(nodes=[optimize_queries]) | |
# Set the entry point | |
workflow.set_entry_point("optimize_queries") | |
# Compile the graph | |
app = workflow.compile() | |
# Run the optimizer | |
result = app.invoke({}) | |
print(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment