Created
March 26, 2025 12:10
-
-
Save alonsoir/af298645bb6186c1e384d4d46cfb2a51 to your computer and use it in GitHub Desktop.
The attemp to create a custom tool using agno. Delta_lake from Apache Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
from textwrap import dedent | |
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import col | |
from agno.agent import Agent | |
from agno.models.openai import OpenAIChat | |
import requests | |
from dotenv import load_dotenv | |
import logging | |
# Configurar logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
# Cargar API key | |
load_dotenv() | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
logging.error("El token de OpenAI no está configurado.") | |
raise ValueError("El token de OpenAI no está configurado.") | |
# Configurar Spark con soporte para Delta Lake en local | |
spark = SparkSession.builder \ | |
.appName("LocalDeltaLakeAgent") \ | |
.master("local[*]") \ | |
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ | |
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ | |
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \ | |
.config("spark.driver.host", "127.0.0.1") \ | |
.config("spark.driver.bindAddress", "127.0.0.1") \ | |
.config("spark.delta.logRetentionDuration", "interval 7 days") \ | |
.config("spark.delta.deletedFileRetentionDuration", "interval 7 days") \ | |
.getOrCreate() | |
# Herramienta personalizada para Delta Lake local | |
class DeltaLakeTools: | |
def __init__(self, table_path): | |
self.table_path = table_path | |
if not os.path.exists(table_path): | |
os.makedirs(table_path) | |
def initialize_from_remote_csv(self, url): | |
local_csv_path = "temp_movies.csv" | |
response = requests.get(url) | |
with open(local_csv_path, "wb") as f: | |
f.write(response.content) | |
df = spark.read.option("header", "true").csv(local_csv_path) | |
# Inspeccionar columnas originales | |
original_columns = df.columns | |
logging.info(f"Columnas originales del CSV: {original_columns}") | |
# Definir columnas esperadas | |
expected_columns = ["Rank", "Title", "Genre", "Description", "Director", "Actors", "Year", "Runtime_Minutes", "Rating", "Votes", "Revenue_Millions", "Metascore"] | |
if len(original_columns) != len(expected_columns): | |
logging.warning(f"El número de columnas no coincide. Original: {len(original_columns)}, Esperado: {len(expected_columns)}") | |
df = df.toDF(*expected_columns[:len(original_columns)]) | |
df = df.withColumn("Rating", col("Rating").cast("float")).filter(col("Rating").isNotNull()) | |
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(self.table_path) | |
logging.info(f"Tabla Delta inicializada en {self.table_path} con datos de {url}") | |
os.remove(local_csv_path) | |
async def query(self, sql_query): | |
"""Ejecutar consulta sobre la tabla Delta y devolver resultados en Markdown""" | |
logging.info(f"Ejecutando consulta: {sql_query}") | |
loop = asyncio.get_event_loop() | |
df = spark.read.format("delta").load(self.table_path) | |
if "AVG" in sql_query.upper(): | |
result = df.agg({"Rating": "avg"}).toPandas().to_markdown() | |
else: | |
result = df.orderBy(df.Rating.desc()).limit(5).select("Title", "Rating").toPandas().to_markdown() | |
logging.info(f"Resultado de la consulta: {result}") | |
return result | |
async def update_table(self, url): | |
loop = asyncio.get_event_loop() | |
local_csv_path = "temp_movies.csv" | |
response = requests.get(url) | |
with open(local_csv_path, "wb") as f: | |
f.write(response.content) | |
df = spark.read.option("header", "true").csv(local_csv_path) | |
original_columns = df.columns | |
logging.info(f"Columnas originales del CSV (actualización): {original_columns}") | |
expected_columns = ["Rank", "Title", "Genre", "Description", "Director", "Actors", "Year", "Runtime_Minutes", "Rating", "Votes", "Revenue_Millions", "Metascore"] | |
df = df.toDF(*expected_columns[:len(original_columns)]) | |
df = df.withColumn("Rating", col("Rating").cast("float")).filter(col("Rating").isNotNull()) | |
await loop.run_in_executor(None, lambda: df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(self.table_path)) | |
logging.info(f"Tabla actualizada con datos de {url}") | |
os.remove(local_csv_path) | |
# Configurar herramientas y agente | |
local_table_path = "./delta_movies" | |
delta_tools = DeltaLakeTools(table_path=local_table_path) | |
delta_tools.initialize_from_remote_csv("https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv") | |
agent = Agent( | |
model=OpenAIChat(id="gpt-4o", api_key=openai_api_key), | |
tools=[delta_tools], | |
markdown=True, | |
show_tool_calls=True, | |
additional_context=dedent("""\ | |
You have a Delta Lake table at ./delta_movies with IMDB data (columns: Title, Rating, Year, etc.). | |
Rating is a numeric column (1-10). Use the 'query' tool for all data requests and return its Markdown result. | |
- "What is the average rating?": Call 'query' with "SELECT AVG(Rating) AS Average_Rating FROM default.movies". | |
- "List the top 5 movies by rating": Call 'query' with "SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5". | |
Do NOT invent data or skip the tool. Return only the Markdown table from 'query'. | |
"""), | |
) | |
# Función para probar consultas y actualizaciones | |
async def run_local_operations(): | |
print("Consulta inicial:") | |
await agent.aprint_response("What is the average rating of movies?") | |
print("\nActualizando tabla...") | |
await delta_tools.update_table("https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv") | |
print("\nConsulta tras actualización:") | |
await agent.aprint_response("List the top 5 movies by rating.") | |
print("\nVerificación manual:") | |
result_avg = await delta_tools.query("SELECT AVG(Rating) AS Average_Rating FROM default.movies") | |
print("Promedio manual:", result_avg) | |
result_top5 = await delta_tools.query("SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5") | |
print("Top 5 manual:", result_top5) | |
# Ejecutar el flujo | |
asyncio.run(run_local_operations()) | |
# Cerrar la sesión de Spark | |
spark.stop() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
25/03/26 13:08:12 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.114 instead (on interface en0) | |
25/03/26 13:08:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address | |
:: loading settings :: url = jar:file:/Users/aironman/git/python-samples-2025/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml | |
Ivy Default Cache set to: /Users/aironman/.ivy2/cache | |
The jars for the packages stored in: /Users/aironman/.ivy2/jars | |
io.delta#delta-spark_2.12 added as a dependency | |
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4c1ca62c-ca3a-4ee7-b820-2d5147d81522;1.0 | |
confs: [default] | |
found io.delta#delta-spark_2.12;3.3.0 in central | |
found io.delta#delta-storage;3.3.0 in central | |
found org.antlr#antlr4-runtime;4.9.3 in central | |
:: resolution report :: resolve 194ms :: artifacts dl 7ms | |
:: modules in use: | |
io.delta#delta-spark_2.12;3.3.0 from central in [default] | |
io.delta#delta-storage;3.3.0 from central in [default] | |
org.antlr#antlr4-runtime;4.9.3 from central in [default] | |
--------------------------------------------------------------------- | |
| | modules || artifacts | | |
| conf | number| search|dwnlded|evicted|| number|dwnlded| | |
--------------------------------------------------------------------- | |
| default | 3 | 0 | 0 | 0 || 3 | 0 | | |
--------------------------------------------------------------------- | |
:: retrieving :: org.apache.spark#spark-submit-parent-4c1ca62c-ca3a-4ee7-b820-2d5147d81522 | |
confs: [default] | |
0 artifacts copied, 3 already retrieved (0kB/5ms) | |
25/03/26 13:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
Setting default log level to "WARN". | |
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). | |
2025-03-26 13:08:22,842 - INFO - Columnas originales del CSV: ['Rank', 'Title', 'Genre', 'Description', 'Director', 'Actors', 'Year', 'Runtime (Minutes)', 'Rating', 'Votes', 'Revenue (Millions)', 'Metascore'] | |
25/03/26 13:08:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. | |
2025-03-26 13:08:33,302 - INFO - Tabla Delta inicializada en ./delta_movies con datos de https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv | |
Consulta inicial: | |
▰▰▰▱▱▱▱ Thinking... | |
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
▰▰▰▰▰▰▰ Thinking... | |
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
┃ ┃ | |
┃ What is the average rating of movies? ┃ | |
┃ ┃ | |
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ | |
┏━ Response (1.7s) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
┃ ┃ | |
┃ To find the average rating of movies, I will run the following query: ┃ | |
┃ ┃ | |
┃ ┃ | |
┃ SELECT AVG(Rating) AS Average_Rating FROM default.movies ┃ | |
┃ ┃ | |
┃ ┃ | |
┃ Let's see the result. ┃ | |
┃ ┃ | |
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ | |
Actualizando tabla... | |
2025-03-26 13:08:36,075 - INFO - Columnas originales del CSV (actualización): ['Rank', 'Title', 'Genre', 'Description', 'Director', 'Actors', 'Year', 'Runtime (Minutes)', 'Rating', 'Votes', 'Revenue (Millions)', 'Metascore'] | |
2025-03-26 13:08:38,385 - INFO - Tabla actualizada con datos de https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv | |
Consulta tras actualización: | |
▰▰▰▱▱▱▱ Thinking... | |
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
▰▱▱▱▱▱▱ Thinking... | |
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
┃ ┃ | |
┃ List the top 5 movies by rating. ┃ | |
┃ ┃ | |
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ | |
┏━ Response (1.9s) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ | |
┃ ┃ | |
┃ ┃ | |
┃ SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5 ┃ | |
┃ ┃ | |
┃ ┃ | |
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ | |
Verificación manual: | |
2025-03-26 13:08:40,249 - INFO - Ejecutando consulta: SELECT AVG(Rating) AS Average_Rating FROM default.movies | |
2025-03-26 13:08:42,067 - INFO - Resultado de la consulta: | | avg(Rating) | | |
|---:|--------------:| | |
| 0 | 15.4287 | | |
Promedio manual: | | avg(Rating) | | |
|---:|--------------:| | |
| 0 | 15.4287 | | |
2025-03-26 13:08:42,067 - INFO - Ejecutando consulta: SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5 | |
2025-03-26 13:08:42,450 - INFO - Resultado de la consulta: | | Title | Rating | | |
|---:|:--------------------|---------:| | |
| 0 | Forushande | 2016 | | |
| 1 | Kung Fu Panda 3 | 2016 | | |
| 2 | The Interview | 2014 | | |
| 3 | Idiocracy | 2006 | | |
| 4 | A Cure for Wellness | 146 | | |
Top 5 manual: | | Title | Rating | | |
|---:|:--------------------|---------:| | |
| 0 | Forushande | 2016 | | |
| 1 | Kung Fu Panda 3 | 2016 | | |
| 2 | The Interview | 2014 | | |
| 3 | Idiocracy | 2006 | | |
| 4 | A Cure for Wellness | 146 | | |
2025-03-26 13:08:42,451 - INFO - Closing down clientserver connection | |
2025-03-26 13:08:42,768 - INFO - Closing down clientserver connection | |
poetry run python delta_lake_data_analyst.py 3,09s user 1,03s system 11% cpu 34,959 total | |
python-samples-2025-py3.10┌<▸> ~/g/p/s/p/agno | |
└➤ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've been trying all morning to use your software to implement a custom tool that uses Delta-Lake, without much success. It doesn't seem to be using a query to run the query on the table, so it literally freaks out and doesn't show any data inferred through the query. Is it possible to use custom tools?