Skip to content

Instantly share code, notes, and snippets.

@kgantsov
Created October 21, 2025 18:06
Show Gist options
  • Select an option

  • Save kgantsov/b114b2df42fbabe11ba8e5020618e526 to your computer and use it in GitHub Desktop.

Select an option

Save kgantsov/b114b2df42fbabe11ba8e5020618e526 to your computer and use it in GitHub Desktop.
Minimalistic Peewee adapter for rqlite
import requests
import json
import time
from peewee import *
class RqliteCursor:
"""Minimal cursor-like wrapper for rqlite query responses."""
def __init__(self, rows=None, columns=None, lastrowid=None, rowcount=None):
self._rows = rows or []
self._columns = columns or []
self.lastrowid = lastrowid
self.rowcount = rowcount
self._index = 0
def fetchall(self):
remaining = self._rows[self._index :]
self._index = len(self._rows)
return remaining
def fetchone(self):
if self._index < len(self._rows):
row = self._rows[self._index]
self._index += 1
return row
return None
@property
def description(self):
return self._columns
def close(self):
"""Peewee calls this after iterating results. No-op for HTTP backend."""
pass
class RqliteDatabase(SqliteDatabase):
def __init__(self, url="http://localhost:4001", **kwargs):
# Peewee expects a filename, but we won’t use it
super().__init__(None, **kwargs)
self.url = url.rstrip("/")
def _execute(self, sql, params=None, is_query=False):
# rqlite only accepts fully interpolated SQL
if params:
sql = sql.replace("?", "{}").format(
*[json.dumps(p) for p in params]
)
endpoint = "/db/query" if is_query else "/db/execute"
payload = [sql]
resp = requests.post(f"{self.url}{endpoint}", json=payload)
resp.raise_for_status()
data = resp.json()
if "results" not in data or not data["results"]:
return RqliteCursor()
result = data["results"][0]
rows = result.get("values", [])
columns = result.get("columns", [])
return RqliteCursor(rows=rows, columns=columns)
def execute_sql(self, sql, params=None, commit=True):
sql = sql.strip()
is_query = sql.lower().startswith("select")
return self._execute(sql, params=params, is_query=is_query)
db = RqliteDatabase("http://localhost:4001")
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
username = CharField(unique=True)
email = CharField()
active = BooleanField(default=True)
def main():
# Create table
db.create_tables([User])
# Insert rows
User.create(username="alice", email="[email protected]")
User.create(username="bob", email="[email protected]", active=False)
for user in User.select():
print(user.username, user.email, user.active)
query = User.update(active=True).where(User.username == "bob")
query.execute()
users = list(User.select())
print("After update:", len(users))
for u in users:
print(f"{u.username} -----> active={u.active}")
started = time.monotonic()
for x in range(1000):
User.create(username=f"user{x}", email=f"user{x}@example.com", active=x % 100 == 0)
print(f"Elapsed time: {time.monotonic() - started:.2f} seconds")
print()
print("Active users:")
for user in User.select().where(User.active == True):
print(user.username, user.email, user.active)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment