These classes allow you to define a DB connection that marimo understands.
import pyarrow as pa
from typing import Any, Dict, List, Optional, Tuple, Union
class LogfireDBError(Exception):
"""Base exception for Logfire DB API errors"""
pass
class LogfireDBOperationalError(LogfireDBError):
"""Exception for operational errors (network, connection errors)"""
pass
class LogfireDBProgrammingError(LogfireDBError):
"""Exception for programming errors (SQL syntax, etc.)"""
pass
class LogfireDBCursor:
"""DB API 2.0 compatible cursor for Logfire"""
def __init__(self, connection: 'LogfireDBConnection'):
self.connection = connection
self._results: List[Tuple] = []
self._description: Optional[List[Tuple]] = None
self._column_names: List[str] = []
self.rowcount = -1
self.arraysize = 1
self._closed = False
@property
def description(self) -> Optional[List[Tuple]]:
"""Returns description of columns from last executed query"""
return self._description
def execute(self, query: str, parameters: Optional[Dict[str, Any]] = None) -> None:
"""Execute a SQL query"""
if self._closed:
raise LogfireDBProgrammingError("Cursor is closed")
# Handle parameterized queries (simple string substitution)
if parameters:
for key, value in parameters.items():
# Simple parameter substitution - escape single quotes
if isinstance(value, str):
escaped_value = value.replace("'", "''")
query = query.replace(f":{key}", f"'{escaped_value}'")
elif value is None:
query = query.replace(f":{key}", "NULL")
else:
query = query.replace(f":{key}", str(value))
try:
with LogfireQueryClient(read_token=self.connection.read_token) as client:
# Execute query and get Arrow table
arrow_table = client.query_arrow(sql=query)
# Convert Arrow table to list of tuples
self._results = []
self._column_names = arrow_table.column_names
# Convert each row to a tuple
for batch in arrow_table.to_batches():
for i in range(batch.num_rows):
row = []
for col_idx, column in enumerate(batch.columns):
value = column[i].as_py()
row.append(value)
self._results.append(tuple(row))
self.rowcount = len(self._results)
# Set description based on Arrow schema
self._description = []
for field in arrow_table.schema:
# DB API 2.0 description format:
# (name, type_code, display_size, internal_size, precision, scale, null_ok)
type_name = str(field.type)
self._description.append(
(field.name, type_name, None, None, None, None, field.nullable)
)
except Exception as e:
if "syntax" in str(e).lower() or "parse" in str(e).lower():
raise LogfireDBProgrammingError(f"Query execution failed: {e}")
else:
raise LogfireDBOperationalError(f"Query execution failed: {e}")
def fetchone(self) -> Optional[Tuple]:
"""Fetch next row from query results"""
if self._closed:
raise LogfireDBProgrammingError("Cursor is closed")
if not self._results:
return None
return self._results.pop(0)
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
"""Fetch multiple rows from query results"""
if self._closed:
raise LogfireDBProgrammingError("Cursor is closed")
if size is None:
size = self.arraysize
results = []
for _ in range(size):
row = self.fetchone()
if row is None:
break
results.append(row)
return results
def fetchall(self) -> List[Tuple]:
"""Fetch all remaining rows from query results"""
if self._closed:
raise LogfireDBProgrammingError("Cursor is closed")
results = self._results.copy()
self._results.clear()
return results
def fetchall_dict(self) -> List[Dict[str, Any]]:
"""Fetch all remaining rows as dictionaries (non-standard but useful)"""
if self._closed:
raise LogfireDBProgrammingError("Cursor is closed")
results = []
while self._results:
row = self._results.pop(0)
row_dict = {self._column_names[i]: value for i, value in enumerate(row)}
results.append(row_dict)
return results
def close(self) -> None:
"""Close the cursor"""
self._closed = True
self._results.clear()
self._description = None
self._column_names.clear()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class LogfireDBConnection:
"""DB API 2.0 compatible connection for Logfire"""
def __init__(self, read_token: str):
self.read_token = read_token
self._closed = False
def cursor(self) -> LogfireDBCursor:
"""Create a new cursor"""
if self._closed:
raise LogfireDBProgrammingError("Connection is closed")
return LogfireDBCursor(self)
def commit(self) -> None:
"""Commit transaction (no-op for Logfire)"""
if self._closed:
raise LogfireDBProgrammingError("Connection is closed")
# Logfire is read-only, so this is a no-op
pass
def rollback(self) -> None:
"""Rollback transaction (no-op for Logfire)"""
if self._closed:
raise LogfireDBProgrammingError("Connection is closed")
# Logfire is read-only, so this is a no-op
pass
def close(self) -> None:
"""Close the connection"""
self._closed = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def connect(read_token: str) -> LogfireDBConnection:
"""Create a connection to Logfire query API
Args:
read_token: Your Logfire read token (e.g., 'pylf_v1_eu_...')
Returns:
LogfireDBConnection: A DB API 2.0 compatible connection object
Example:
conn = connect(read_token='pylf_v1_eu_...')
cursor = conn.cursor()
cursor.execute("SELECT * FROM records WHERE otel_scope_name = 'logfire.openai'")
rows = cursor.fetchall()
"""
return LogfireDBConnection(read_token)From here, you can add this connection:
logfire_conn = connect(read_token=my_read_token)And you can now select the logfire connection in SQL cells!