Skip to content

Instantly share code, notes, and snippets.

@alonsoir
Created March 26, 2025 12:10
Show Gist options
  • Save alonsoir/af298645bb6186c1e384d4d46cfb2a51 to your computer and use it in GitHub Desktop.
Save alonsoir/af298645bb6186c1e384d4d46cfb2a51 to your computer and use it in GitHub Desktop.
The attemp to create a custom tool using agno. Delta_lake from Apache Spark
import asyncio
import os
from textwrap import dedent
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from agno.agent import Agent
from agno.models.openai import OpenAIChat
import requests
from dotenv import load_dotenv
import logging
# Configurar logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# Cargar API key
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
logging.error("El token de OpenAI no está configurado.")
raise ValueError("El token de OpenAI no está configurado.")
# Configurar Spark con soporte para Delta Lake en local
spark = SparkSession.builder \
.appName("LocalDeltaLakeAgent") \
.master("local[*]") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config("spark.delta.logRetentionDuration", "interval 7 days") \
.config("spark.delta.deletedFileRetentionDuration", "interval 7 days") \
.getOrCreate()
# Herramienta personalizada para Delta Lake local
class DeltaLakeTools:
def __init__(self, table_path):
self.table_path = table_path
if not os.path.exists(table_path):
os.makedirs(table_path)
def initialize_from_remote_csv(self, url):
local_csv_path = "temp_movies.csv"
response = requests.get(url)
with open(local_csv_path, "wb") as f:
f.write(response.content)
df = spark.read.option("header", "true").csv(local_csv_path)
# Inspeccionar columnas originales
original_columns = df.columns
logging.info(f"Columnas originales del CSV: {original_columns}")
# Definir columnas esperadas
expected_columns = ["Rank", "Title", "Genre", "Description", "Director", "Actors", "Year", "Runtime_Minutes", "Rating", "Votes", "Revenue_Millions", "Metascore"]
if len(original_columns) != len(expected_columns):
logging.warning(f"El número de columnas no coincide. Original: {len(original_columns)}, Esperado: {len(expected_columns)}")
df = df.toDF(*expected_columns[:len(original_columns)])
df = df.withColumn("Rating", col("Rating").cast("float")).filter(col("Rating").isNotNull())
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(self.table_path)
logging.info(f"Tabla Delta inicializada en {self.table_path} con datos de {url}")
os.remove(local_csv_path)
async def query(self, sql_query):
"""Ejecutar consulta sobre la tabla Delta y devolver resultados en Markdown"""
logging.info(f"Ejecutando consulta: {sql_query}")
loop = asyncio.get_event_loop()
df = spark.read.format("delta").load(self.table_path)
if "AVG" in sql_query.upper():
result = df.agg({"Rating": "avg"}).toPandas().to_markdown()
else:
result = df.orderBy(df.Rating.desc()).limit(5).select("Title", "Rating").toPandas().to_markdown()
logging.info(f"Resultado de la consulta: {result}")
return result
async def update_table(self, url):
loop = asyncio.get_event_loop()
local_csv_path = "temp_movies.csv"
response = requests.get(url)
with open(local_csv_path, "wb") as f:
f.write(response.content)
df = spark.read.option("header", "true").csv(local_csv_path)
original_columns = df.columns
logging.info(f"Columnas originales del CSV (actualización): {original_columns}")
expected_columns = ["Rank", "Title", "Genre", "Description", "Director", "Actors", "Year", "Runtime_Minutes", "Rating", "Votes", "Revenue_Millions", "Metascore"]
df = df.toDF(*expected_columns[:len(original_columns)])
df = df.withColumn("Rating", col("Rating").cast("float")).filter(col("Rating").isNotNull())
await loop.run_in_executor(None, lambda: df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(self.table_path))
logging.info(f"Tabla actualizada con datos de {url}")
os.remove(local_csv_path)
# Configurar herramientas y agente
local_table_path = "./delta_movies"
delta_tools = DeltaLakeTools(table_path=local_table_path)
delta_tools.initialize_from_remote_csv("https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv")
agent = Agent(
model=OpenAIChat(id="gpt-4o", api_key=openai_api_key),
tools=[delta_tools],
markdown=True,
show_tool_calls=True,
additional_context=dedent("""\
You have a Delta Lake table at ./delta_movies with IMDB data (columns: Title, Rating, Year, etc.).
Rating is a numeric column (1-10). Use the 'query' tool for all data requests and return its Markdown result.
- "What is the average rating?": Call 'query' with "SELECT AVG(Rating) AS Average_Rating FROM default.movies".
- "List the top 5 movies by rating": Call 'query' with "SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5".
Do NOT invent data or skip the tool. Return only the Markdown table from 'query'.
"""),
)
# Función para probar consultas y actualizaciones
async def run_local_operations():
print("Consulta inicial:")
await agent.aprint_response("What is the average rating of movies?")
print("\nActualizando tabla...")
await delta_tools.update_table("https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv")
print("\nConsulta tras actualización:")
await agent.aprint_response("List the top 5 movies by rating.")
print("\nVerificación manual:")
result_avg = await delta_tools.query("SELECT AVG(Rating) AS Average_Rating FROM default.movies")
print("Promedio manual:", result_avg)
result_top5 = await delta_tools.query("SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5")
print("Top 5 manual:", result_top5)
# Ejecutar el flujo
asyncio.run(run_local_operations())
# Cerrar la sesión de Spark
spark.stop()
25/03/26 13:08:12 WARN Utils: Your hostname, MacBook-Pro-de-Alonso.local resolves to a loopback address: 127.0.0.1; using 192.168.1.114 instead (on interface en0)
25/03/26 13:08:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/aironman/git/python-samples-2025/.venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/aironman/.ivy2/cache
The jars for the packages stored in: /Users/aironman/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4c1ca62c-ca3a-4ee7-b820-2d5147d81522;1.0
confs: [default]
found io.delta#delta-spark_2.12;3.3.0 in central
found io.delta#delta-storage;3.3.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 194ms :: artifacts dl 7ms
:: modules in use:
io.delta#delta-spark_2.12;3.3.0 from central in [default]
io.delta#delta-storage;3.3.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-4c1ca62c-ca3a-4ee7-b820-2d5147d81522
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/5ms)
25/03/26 13:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2025-03-26 13:08:22,842 - INFO - Columnas originales del CSV: ['Rank', 'Title', 'Genre', 'Description', 'Director', 'Actors', 'Year', 'Runtime (Minutes)', 'Rating', 'Votes', 'Revenue (Millions)', 'Metascore']
25/03/26 13:08:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2025-03-26 13:08:33,302 - INFO - Tabla Delta inicializada en ./delta_movies con datos de https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv
Consulta inicial:
▰▰▰▱▱▱▱ Thinking...
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
▰▰▰▰▰▰▰ Thinking...
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ What is the average rating of movies? ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
┏━ Response (1.7s) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ To find the average rating of movies, I will run the following query: ┃
┃ ┃
┃ ┃
┃ SELECT AVG(Rating) AS Average_Rating FROM default.movies ┃
┃ ┃
┃ ┃
┃ Let's see the result. ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
Actualizando tabla...
2025-03-26 13:08:36,075 - INFO - Columnas originales del CSV (actualización): ['Rank', 'Title', 'Genre', 'Description', 'Director', 'Actors', 'Year', 'Runtime (Minutes)', 'Rating', 'Votes', 'Revenue (Millions)', 'Metascore']
2025-03-26 13:08:38,385 - INFO - Tabla actualizada con datos de https://agno-public.s3.amazonaws.com/demo_data/IMDB-Movie-Data.csv
Consulta tras actualización:
▰▰▰▱▱▱▱ Thinking...
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
▰▱▱▱▱▱▱ Thinking...
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┏━ Message ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ List the top 5 movies by rating. ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
┏━ Response (1.9s) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ ┃
┃ SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5 ┃
┃ ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
Verificación manual:
2025-03-26 13:08:40,249 - INFO - Ejecutando consulta: SELECT AVG(Rating) AS Average_Rating FROM default.movies
2025-03-26 13:08:42,067 - INFO - Resultado de la consulta: | | avg(Rating) |
|---:|--------------:|
| 0 | 15.4287 |
Promedio manual: | | avg(Rating) |
|---:|--------------:|
| 0 | 15.4287 |
2025-03-26 13:08:42,067 - INFO - Ejecutando consulta: SELECT Title, Rating FROM default.movies ORDER BY Rating DESC LIMIT 5
2025-03-26 13:08:42,450 - INFO - Resultado de la consulta: | | Title | Rating |
|---:|:--------------------|---------:|
| 0 | Forushande | 2016 |
| 1 | Kung Fu Panda 3 | 2016 |
| 2 | The Interview | 2014 |
| 3 | Idiocracy | 2006 |
| 4 | A Cure for Wellness | 146 |
Top 5 manual: | | Title | Rating |
|---:|:--------------------|---------:|
| 0 | Forushande | 2016 |
| 1 | Kung Fu Panda 3 | 2016 |
| 2 | The Interview | 2014 |
| 3 | Idiocracy | 2006 |
| 4 | A Cure for Wellness | 146 |
2025-03-26 13:08:42,451 - INFO - Closing down clientserver connection
2025-03-26 13:08:42,768 - INFO - Closing down clientserver connection
poetry run python delta_lake_data_analyst.py 3,09s user 1,03s system 11% cpu 34,959 total
python-samples-2025-py3.10┌<▸> ~/g/p/s/p/agno
└➤
@alonsoir
Copy link
Author

I've been trying all morning to use your software to implement a custom tool that uses Delta-Lake, without much success. It doesn't seem to be using a query to run the query on the table, so it literally freaks out and doesn't show any data inferred through the query. Is it possible to use custom tools?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment