Created
November 6, 2025 15:23
-
-
Save koaning/09866121ab0740a0c3d5c6cf71d6283e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pyarrow as pa | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| class LogfireDBError(Exception): | |
| """Base exception for Logfire DB API errors""" | |
| pass | |
| class LogfireDBOperationalError(LogfireDBError): | |
| """Exception for operational errors (network, connection errors)""" | |
| pass | |
| class LogfireDBProgrammingError(LogfireDBError): | |
| """Exception for programming errors (SQL syntax, etc.)""" | |
| pass | |
| class LogfireDBCursor: | |
| """DB API 2.0 compatible cursor for Logfire""" | |
| def __init__(self, connection: 'LogfireDBConnection'): | |
| self.connection = connection | |
| self._results: List[Tuple] = [] | |
| self._description: Optional[List[Tuple]] = None | |
| self._column_names: List[str] = [] | |
| self.rowcount = -1 | |
| self.arraysize = 1 | |
| self._closed = False | |
| @property | |
| def description(self) -> Optional[List[Tuple]]: | |
| """Returns description of columns from last executed query""" | |
| return self._description | |
| def execute(self, query: str, parameters: Optional[Dict[str, Any]] = None) -> None: | |
| """Execute a SQL query""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Cursor is closed") | |
| # Handle parameterized queries (simple string substitution) | |
| if parameters: | |
| for key, value in parameters.items(): | |
| # Simple parameter substitution - escape single quotes | |
| if isinstance(value, str): | |
| escaped_value = value.replace("'", "''") | |
| query = query.replace(f":{key}", f"'{escaped_value}'") | |
| elif value is None: | |
| query = query.replace(f":{key}", "NULL") | |
| else: | |
| query = query.replace(f":{key}", str(value)) | |
| try: | |
| with LogfireQueryClient(read_token=self.connection.read_token) as client: | |
| # Execute query and get Arrow table | |
| arrow_table = client.query_arrow(sql=query) | |
| # Convert Arrow table to list of tuples | |
| self._results = [] | |
| self._column_names = arrow_table.column_names | |
| # Convert each row to a tuple | |
| for batch in arrow_table.to_batches(): | |
| for i in range(batch.num_rows): | |
| row = [] | |
| for col_idx, column in enumerate(batch.columns): | |
| value = column[i].as_py() | |
| row.append(value) | |
| self._results.append(tuple(row)) | |
| self.rowcount = len(self._results) | |
| # Set description based on Arrow schema | |
| self._description = [] | |
| for field in arrow_table.schema: | |
| # DB API 2.0 description format: | |
| # (name, type_code, display_size, internal_size, precision, scale, null_ok) | |
| type_name = str(field.type) | |
| self._description.append( | |
| (field.name, type_name, None, None, None, None, field.nullable) | |
| ) | |
| except Exception as e: | |
| if "syntax" in str(e).lower() or "parse" in str(e).lower(): | |
| raise LogfireDBProgrammingError(f"Query execution failed: {e}") | |
| else: | |
| raise LogfireDBOperationalError(f"Query execution failed: {e}") | |
| def fetchone(self) -> Optional[Tuple]: | |
| """Fetch next row from query results""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Cursor is closed") | |
| if not self._results: | |
| return None | |
| return self._results.pop(0) | |
| def fetchmany(self, size: Optional[int] = None) -> List[Tuple]: | |
| """Fetch multiple rows from query results""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Cursor is closed") | |
| if size is None: | |
| size = self.arraysize | |
| results = [] | |
| for _ in range(size): | |
| row = self.fetchone() | |
| if row is None: | |
| break | |
| results.append(row) | |
| return results | |
| def fetchall(self) -> List[Tuple]: | |
| """Fetch all remaining rows from query results""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Cursor is closed") | |
| results = self._results.copy() | |
| self._results.clear() | |
| return results | |
| def fetchall_dict(self) -> List[Dict[str, Any]]: | |
| """Fetch all remaining rows as dictionaries (non-standard but useful)""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Cursor is closed") | |
| results = [] | |
| while self._results: | |
| row = self._results.pop(0) | |
| row_dict = {self._column_names[i]: value for i, value in enumerate(row)} | |
| results.append(row_dict) | |
| return results | |
| def close(self) -> None: | |
| """Close the cursor""" | |
| self._closed = True | |
| self._results.clear() | |
| self._description = None | |
| self._column_names.clear() | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| class LogfireDBConnection: | |
| """DB API 2.0 compatible connection for Logfire""" | |
| def __init__(self, read_token: str): | |
| self.read_token = read_token | |
| self._closed = False | |
| def cursor(self) -> LogfireDBCursor: | |
| """Create a new cursor""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Connection is closed") | |
| return LogfireDBCursor(self) | |
| def commit(self) -> None: | |
| """Commit transaction (no-op for Logfire)""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Connection is closed") | |
| # Logfire is read-only, so this is a no-op | |
| pass | |
| def rollback(self) -> None: | |
| """Rollback transaction (no-op for Logfire)""" | |
| if self._closed: | |
| raise LogfireDBProgrammingError("Connection is closed") | |
| # Logfire is read-only, so this is a no-op | |
| pass | |
| def close(self) -> None: | |
| """Close the connection""" | |
| self._closed = True | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| def connect(read_token: str) -> LogfireDBConnection: | |
| """Create a connection to Logfire query API | |
| Args: | |
| read_token: Your Logfire read token (e.g., 'pylf_v1_eu_...') | |
| Returns: | |
| LogfireDBConnection: A DB API 2.0 compatible connection object | |
| Example: | |
| conn = connect(read_token='pylf_v1_eu_...') | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM records WHERE otel_scope_name = 'logfire.openai'") | |
| rows = cursor.fetchall() | |
| """ | |
| return LogfireDBConnection(read_token) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment