Created
August 12, 2020 09:55
-
-
Save Cologler/1960afa7bf618e14a1912457a1f43616 to your computer and use it in GitHub Desktop.
sqlitedict with jsonref
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with SqliteDict('./c.db', encode=json.dumps, decode=json.loads, autocommit=True) as db: | |
with contextlib.closing(SqliteDictLoader(db)) as sdloader: | |
db['a'] = {'b': 2, 'c': { | |
'$ref': 'sqlitedict://./k' | |
}} | |
db['k'] = {'f': 'feji'} | |
loader = Loader() | |
loader.loaders.append(sdloader) | |
c = jsonref.loads(json.dumps(db['a']), loader=loader) | |
print(c) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2020~2999 - Cologler <[email protected]> | |
# ---------- | |
# | |
# ---------- | |
import contextlib | |
import os | |
import json | |
from urllib import parse as urlparse | |
from sqlitedict import SqliteDict | |
import jsonref | |
class SqliteDictLoader: | |
schemes = ('sqlitedict') | |
def __init__(self, basedb): | |
super().__init__() | |
self._dicts = {} | |
self._base_filename = os.path.normcase(basedb.filename) | |
self._base_tablename = basedb.tablename.lower() | |
self._dicts[(self._base_filename, self._base_tablename)] = basedb | |
self._opened_dicts = [] | |
def _resolve_location(self, urlparts): | |
# return a tuple (filename, tablename, key) | |
sepc = urlparts.path.count('/') | |
if sepc == 1: | |
if urlparts.netloc == '.': | |
# ://./key | |
_, key = urlparts.path.split('/') | |
key = urlparse.unquote(key) | |
return (self._base_filename, self._base_tablename, key) | |
else: | |
raise NotImplementedError | |
elif sepc == 2: | |
if urlparts.netloc == '..': | |
# ://../tablename/key | |
_, tablename, key = urlparts.path.split('/') | |
tablename = urlparse.unquote(tablename).lower() | |
key = urlparse.unquote(key) | |
return (self._base_filename, tablename, key) | |
else: | |
# ://filename/tablename/key | |
_, tablename, key = urlparts.path.split('/') | |
filename = os.path.normcase(urlparse.unquote(urlparts.netloc)) | |
tablename = urlparse.unquote(tablename).lower() | |
key = urlparse.unquote(key) | |
return (filename, tablename, key) | |
raise ValueError(urlparts) | |
def resolve_object(self, scheme, uri, **kwargs): | |
urlparts = urlparse.urlsplit(uri) | |
filename, tablename, key = self._resolve_location(urlparts) | |
dictkey = (filename, tablename) | |
d = self._dicts.get(dictkey) | |
if d is None: | |
d = SqliteDict(filename=filename, tablename=tablename, flag='r', encode=json.dumps, decode=json.loads) | |
self._opened_dicts.append(d) | |
self._dicts[dictkey] = d | |
return d[key] | |
def close(self): | |
for d in self._opened_dicts: | |
d.close() | |
class Loader(jsonref.JsonLoader): | |
def __init__(self): | |
super().__init__() | |
self.loaders = [] | |
def get_remote_json(self, uri, **kwargs): | |
scheme = urlparse.urlsplit(uri).scheme | |
for loader in self.loaders: | |
if scheme in loader.schemes: | |
return loader.resolve_object(scheme, uri, **kwargs) | |
return super().get_remote_json(uri, **kwargs) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment