Last active
July 23, 2022 10:04
-
-
Save ahancock1/72e459e294875988f4e6138d348122de to your computer and use it in GitHub Desktop.
python reactive rx sqlite nosql json document database with filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
MIT License | |
Copyright (c) 2022 Adam Hancock | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in all | |
copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Any, Dict, List, Protocol | |
from dataclasses import dataclass | |
from enum import Enum | |
from aiosqlite import Connection, Cursor, connect | |
from aiosqlite.context import contextmanager | |
from jsonpickle import encode, decode | |
import rx.operators as ops | |
from rx.subject import Subject | |
from rx.core import Observable | |
import re | |
def async_to_rx(source: Coroutine, loop = None) -> Observable: | |
if loop is None: | |
loop = asyncio.get_event_loop() | |
return (rx | |
.from_future( | |
loop.create_task( | |
source) | |
)) | |
KEY_ERROR = ValueError("key is None") | |
ITEM_ERROR = ValueError("item is None") | |
ARGS_ERROR = ValueError("args is None") | |
Query = str | Dict[str, Any | List[Any]] | |
class IDataStore(Protocol): | |
def changes(self) -> Observable: | |
... | |
def created(self) -> Observable: | |
... | |
def updated(self) -> Observable: | |
... | |
def deleted(self) -> Observable: | |
... | |
def get(self, args: Query = None) -> Observable: | |
... | |
def all(self, args: Query = None) -> Observable: | |
... | |
def insert(self, key: str, item: Any) -> Observable: | |
... | |
def update(self, key: str, item: Any) -> Observable: | |
... | |
def delete(self, key: str) -> Observable: | |
... | |
def count(self, args: Query = None) -> Observable: | |
... | |
def exists(self, key: str) -> Observable: | |
... | |
def save(self, key: str, item: Any) -> Observable: | |
... | |
def clear(self) -> Observable: | |
... | |
class EventType(Enum): | |
CREATED = 1 | |
UPDATED = 2 | |
DELETED = 3 | |
@dataclass | |
class DataEvent: | |
old_item: Any | |
new_item: Any | |
event_type: EventType | |
class DataStore: | |
def __init__(self, | |
file_path: str, | |
table_name: str) -> None: | |
super().__init__() | |
self._file = file_path | |
self._name = table_name | |
self._init = False | |
self._events = Subject() | |
def changes(self) -> Observable: | |
return self._events.pipe( | |
ops.as_observable()) | |
def created(self) -> Observable: | |
return self._events.pipe( | |
ops.filter( | |
lambda x: x.event_type == EventType.CREATED)) | |
def updated(self) -> Observable: | |
return self._events.pipe( | |
ops.filter( | |
lambda x: x.event_type == EventType.UPDATED)) | |
def deleted(self) -> Observable: | |
return self._events.pipe( | |
ops.filter( | |
lambda x: x.event_type == EventType.DELETED)) | |
def get(self, key: str) -> Observable: | |
if not key: | |
raise KEY_ERROR | |
async def _(): | |
async with self._connect() as db: | |
items = await self._select(db, key) | |
if not items: | |
return None | |
return items[0] | |
return async_to_rx(_()) | |
def all(self, args: Query = None) -> Observable: | |
async def _(): | |
async with self._connect() as db: | |
items = await self._select(db, args) | |
if not items: | |
return [] | |
return items | |
return async_to_rx(_()) | |
def insert(self, key: str, item: Any) -> Observable: | |
if not key: | |
raise KEY_ERROR | |
if not item: | |
raise ITEM_ERROR | |
async def _(): | |
async with self._connect() as db: | |
n = await self._count(db, key) | |
if n > 0: | |
raise ValueError(f"key {key} already exists") | |
await self._insert(db, key, item) | |
return async_to_rx(_()) | |
def update(self, key: str, item: Any) -> Observable: | |
if not key: | |
raise KEY_ERROR | |
if not item: | |
raise ITEM_ERROR | |
async def _(): | |
async with self._connect() as db: | |
n = await self._count(db, key) | |
if n == 0: | |
raise ValueError(f"key {key} not found") | |
await self._update(db, key, item) | |
return async_to_rx(_()) | |
def delete(self, key: str) -> Observable: | |
if not key: | |
raise KEY_ERROR | |
async def _(): | |
async with self._connect() as db: | |
await self._delete(db, key) | |
return async_to_rx(_()) | |
def count(self, args: Query = None) -> Observable: | |
async def _(): | |
async with self._connect() as db: | |
return await self._count(db, args) | |
return async_to_rx(_()) | |
def exists(self, args: Query) -> Observable: | |
if not args: | |
raise ARGS_ERROR | |
async def _(): | |
async with self._connect() as db: | |
return await self._count(db, args) > 0 | |
return async_to_rx(_()) | |
def save(self, key: str, item: Any) -> Observable: | |
if not key: | |
raise KEY_ERROR | |
if not item: | |
raise ITEM_ERROR | |
async def _(): | |
async with self._connect() as db: | |
if await self._count(db, key) > 0: | |
await self._update(db, key, item) | |
else: | |
await self._insert(db, key, item) | |
return async_to_rx(_()) | |
def clear(self) -> Observable: | |
async def _(): | |
async with self._connect() as db: | |
items = await self._select(db) | |
if not items: | |
return | |
await self._clear(db) | |
for item in items: | |
self._deleted(item) | |
return async_to_rx(_()) | |
@contextmanager | |
async def _connect(self) -> Connection: | |
db = await connect(self._file) | |
await self._ensure(db) | |
return db | |
@contextmanager | |
async def _execute(self, | |
db: Connection, | |
sql: str, | |
values: List[Any] = []) -> Cursor: | |
x = await db.execute(sql, values) | |
self._print(sql, values) | |
return x | |
async def _ensure(self, db: Connection) -> None: | |
if self._init: | |
return | |
sql = f""" | |
CREATE TABLE IF NOT EXISTS {self._name} | |
( | |
key TEXT PRIMARY KEY, | |
data TEXT NOT NULL | |
) | |
""" | |
await self._commit(db, sql) | |
self._init = True | |
async def _commit(self, | |
db: Connection, | |
sql: str, | |
values: List[Any] = []) -> None: | |
await self._execute(db, sql, values) | |
await db.commit() | |
async def _fetchone(self, | |
db: Connection, | |
sql: str, | |
values: List[Any] = []) -> Any: | |
async with self._execute(db, sql, values) as x: | |
item = await x.fetchone() | |
if item is None: | |
return | |
return item[0] | |
async def _fetchall(self, | |
db: Connection, | |
sql: str, | |
values: List[Any] = []) -> Observable: | |
async with self._execute(db, sql, values) as x: | |
items = await x.fetchall() | |
if not items: | |
return | |
return [item[0] for item in items] | |
def _where(self, args: Query) -> str: | |
match args: | |
case str(): | |
return f"WHERE key = ?" | |
case dict(): | |
clause = [f""" | |
JSON_EXTRACT(data, '$.{k}') | |
IN ({ | |
', '.join('?' * len(v)) | |
if isinstance(v, list | tuple) else | |
'?' | |
}) | |
""" for k, v in args.items()] | |
return "WHERE " + " AND ".join(clause) | |
case _: | |
return "" | |
def _values(self, args: Query) -> List[Any]: | |
match args: | |
case str(): | |
return [args] | |
case dict(): | |
return [ | |
x for v in args.values() | |
for x in | |
(v if isinstance(v, list | tuple) else [v]) | |
] | |
case _: | |
return [] | |
async def _select(self, | |
db: Connection, | |
args: Query = None) -> List[Any]: | |
sql = f""" | |
SELECT data | |
FROM {self._name} | |
{self._where(args)} | |
""" | |
values = self._values(args) | |
items = await self._fetchall(db, sql, values) | |
return [decode(item) for item in items] | |
async def _insert(self, | |
db: Connection, | |
key: str, | |
item: Any) -> None: | |
sql = f""" | |
INSERT INTO {self._name} ( key, data ) | |
VALUES ( ?, ? ) | |
""" | |
values = [key, encode(item)] | |
await self._commit(db, sql, values) | |
self._created(item) | |
async def _update(self, | |
db: Connection, | |
key: str, | |
item: Any) -> None: | |
new_item = item | |
old_item = await self._select(db, key) | |
if not old_item: | |
return | |
old_item = old_item[0] | |
if new_item == old_item: | |
return | |
sql = f""" | |
UPDATE {self._name} | |
SET data = ? | |
WHERE key = ? | |
""" | |
values = [encode(item), key] | |
await self._commit(db, sql, values) | |
self._updated(old_item, new_item) | |
async def _count(self, | |
db: Connection, | |
args: Query) -> int: | |
sql = f""" | |
SELECT COUNT(*) | |
FROM {self._name} | |
{self._where(args)} | |
""" | |
values = self._values(args) | |
return await self._fetchone( | |
db, sql, values) | |
async def _delete(self, db: Connection, key: str) -> None: | |
item = await self._select(db, key) | |
if not item: | |
return | |
sql = f""" | |
DELETE FROM {self._name} | |
WHERE key = ? | |
""" | |
await self._commit(db, sql, [key]) | |
self._deleted(item) | |
async def _clear(self, db: Connection) -> None: | |
sql = f""" | |
DELETE FROM {self._name} | |
""" | |
await self._commit(db, sql) | |
def _created(self, item: Any) -> None: | |
self._events.on_next( | |
DataEvent(None, item, EventType.CREATED)) | |
def _updated(self, old_item: Any, new_item: Any) -> None: | |
self._events.on_next( | |
DataEvent(old_item, new_item, EventType.UPDATED)) | |
def _deleted(self, item: Any) -> None: | |
self._events.on_next( | |
DataEvent(item, None, EventType.DELETED)) | |
def _print(self, sql: str, args: List[Any] | Any = None) -> None: | |
sql_msg = re.sub(" +", " ", sql.replace("\n", "")).strip() | |
if args is not None: | |
if not isinstance(args, list): | |
args = [args] | |
sql_msg = sql_msg.replace("?", "%s") | |
args = [ | |
f"'{x}'" if isinstance(x, str) else str(x) | |
for x in args | |
] | |
sql_msg = sql_msg % tuple(args) | |
print(sql_msg) | |
class Database: | |
_tables: Dict[str, DataStore] | |
def __init__(self, file_path: str): | |
assert file_path | |
self._file = file_path | |
self._tables = {} | |
def __getitem__(self, name: str) -> DataStore: | |
if name not in self._tables: | |
self._tables[name] = DataStore( | |
self._file, name | |
) | |
return self._tables[name] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment