Skip to content

Instantly share code, notes, and snippets.

@koaning
Created October 24, 2025 08:34
Show Gist options
  • Save koaning/46cf5ce99c5f67e8357ee17347670032 to your computer and use it in GitHub Desktop.
Save koaning/46cf5ce99c5f67e8357ee17347670032 to your computer and use it in GitHub Desktop.
Logfire <-> marimo SQL

These classes allow you to define a DB connection that marimo understands.

import pyarrow as pa
from typing import Any, Dict, List, Optional, Tuple, Union


class LogfireDBError(Exception):
    """Base exception for Logfire DB API errors"""
    pass


class LogfireDBOperationalError(LogfireDBError):
    """Exception for operational errors (network, connection errors)"""
    pass


class LogfireDBProgrammingError(LogfireDBError):
    """Exception for programming errors (SQL syntax, etc.)"""
    pass


class LogfireDBCursor:
    """DB API 2.0 compatible cursor for Logfire"""

    def __init__(self, connection: 'LogfireDBConnection'):
        self.connection = connection
        self._results: List[Tuple] = []
        self._description: Optional[List[Tuple]] = None
        self._column_names: List[str] = []
        self.rowcount = -1
        self.arraysize = 1
        self._closed = False

    @property
    def description(self) -> Optional[List[Tuple]]:
        """Returns description of columns from last executed query"""
        return self._description

    def execute(self, query: str, parameters: Optional[Dict[str, Any]] = None) -> None:
        """Execute a SQL query"""
        if self._closed:
            raise LogfireDBProgrammingError("Cursor is closed")

        # Handle parameterized queries (simple string substitution)
        if parameters:
            for key, value in parameters.items():
                # Simple parameter substitution - escape single quotes
                if isinstance(value, str):
                    escaped_value = value.replace("'", "''")
                    query = query.replace(f":{key}", f"'{escaped_value}'")
                elif value is None:
                    query = query.replace(f":{key}", "NULL")
                else:
                    query = query.replace(f":{key}", str(value))

        try:
            with LogfireQueryClient(read_token=self.connection.read_token) as client:
                # Execute query and get Arrow table
                arrow_table = client.query_arrow(sql=query)
                
                # Convert Arrow table to list of tuples
                self._results = []
                self._column_names = arrow_table.column_names
                
                # Convert each row to a tuple
                for batch in arrow_table.to_batches():
                    for i in range(batch.num_rows):
                        row = []
                        for col_idx, column in enumerate(batch.columns):
                            value = column[i].as_py()
                            row.append(value)
                        self._results.append(tuple(row))
                
                self.rowcount = len(self._results)
                
                # Set description based on Arrow schema
                self._description = []
                for field in arrow_table.schema:
                    # DB API 2.0 description format:
                    # (name, type_code, display_size, internal_size, precision, scale, null_ok)
                    type_name = str(field.type)
                    self._description.append(
                        (field.name, type_name, None, None, None, None, field.nullable)
                    )

        except Exception as e:
            if "syntax" in str(e).lower() or "parse" in str(e).lower():
                raise LogfireDBProgrammingError(f"Query execution failed: {e}")
            else:
                raise LogfireDBOperationalError(f"Query execution failed: {e}")

    def fetchone(self) -> Optional[Tuple]:
        """Fetch next row from query results"""
        if self._closed:
            raise LogfireDBProgrammingError("Cursor is closed")

        if not self._results:
            return None

        return self._results.pop(0)

    def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
        """Fetch multiple rows from query results"""
        if self._closed:
            raise LogfireDBProgrammingError("Cursor is closed")

        if size is None:
            size = self.arraysize

        results = []
        for _ in range(size):
            row = self.fetchone()
            if row is None:
                break
            results.append(row)

        return results

    def fetchall(self) -> List[Tuple]:
        """Fetch all remaining rows from query results"""
        if self._closed:
            raise LogfireDBProgrammingError("Cursor is closed")

        results = self._results.copy()
        self._results.clear()
        return results

    def fetchall_dict(self) -> List[Dict[str, Any]]:
        """Fetch all remaining rows as dictionaries (non-standard but useful)"""
        if self._closed:
            raise LogfireDBProgrammingError("Cursor is closed")

        results = []
        while self._results:
            row = self._results.pop(0)
            row_dict = {self._column_names[i]: value for i, value in enumerate(row)}
            results.append(row_dict)
        
        return results

    def close(self) -> None:
        """Close the cursor"""
        self._closed = True
        self._results.clear()
        self._description = None
        self._column_names.clear()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


class LogfireDBConnection:
    """DB API 2.0 compatible connection for Logfire"""

    def __init__(self, read_token: str):
        self.read_token = read_token
        self._closed = False

    def cursor(self) -> LogfireDBCursor:
        """Create a new cursor"""
        if self._closed:
            raise LogfireDBProgrammingError("Connection is closed")
        return LogfireDBCursor(self)

    def commit(self) -> None:
        """Commit transaction (no-op for Logfire)"""
        if self._closed:
            raise LogfireDBProgrammingError("Connection is closed")
        # Logfire is read-only, so this is a no-op
        pass

    def rollback(self) -> None:
        """Rollback transaction (no-op for Logfire)"""
        if self._closed:
            raise LogfireDBProgrammingError("Connection is closed")
        # Logfire is read-only, so this is a no-op
        pass

    def close(self) -> None:
        """Close the connection"""
        self._closed = True

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


def connect(read_token: str) -> LogfireDBConnection:
    """Create a connection to Logfire query API
    
    Args:
        read_token: Your Logfire read token (e.g., 'pylf_v1_eu_...')
    
    Returns:
        LogfireDBConnection: A DB API 2.0 compatible connection object
    
    Example:
        conn = connect(read_token='pylf_v1_eu_...')
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM records WHERE otel_scope_name = 'logfire.openai'")
        rows = cursor.fetchall()
    """
    return LogfireDBConnection(read_token)

From here, you can add this connection:

logfire_conn = connect(read_token=my_read_token)

And you can now select the logfire connection in SQL cells!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment