Created
June 5, 2020 14:34
-
-
Save tripulse/ca98cfc705760aec4a0d137b7d0bf08c to your computer and use it in GitHub Desktop.
KVP — key value pair is a similar format to SQLite/INI but more abstract.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
from functools import partial | |
def isbinaryfile(f, mode) -> bool: | |
"""Is a given file-like object `f` capable of handling | |
binary operations a given mode (read='r' or write='w').""" | |
if mode == 'r': | |
try: | |
return f.read(0) == b'' | |
except: | |
return False | |
elif mode == 'w': | |
try: | |
f.write(b'') | |
return True | |
except: | |
return False | |
class Pair: | |
def __repr__(self): | |
return f'{repr(self.key[:20])}({len(self.value)} bytes)' | |
@property | |
def key(self): | |
return bytes(self._key[:256]) | |
@property | |
def value(self): | |
return bytes(self._value[:1099511627776]) | |
def __init__(self, key, value): | |
"Initalise a `bytes` key-value pair." | |
self._key = bytes(key[:256]) | |
self._value = bytes(value[:1099511627776]) | |
@staticmethod | |
def fromfile(f): | |
"""Parse a single key-value pair from a raw-filestream. | |
This returns `None` if not found or invalid pair.""" | |
assert isbinaryfile(f, 'r'), 'unreadable binary file' | |
try: | |
klen = f.read(1)[0] + 1 | |
k = f.read(klen) | |
vlen = int.from_bytes(f.read(5), 'big') | |
v = f.read(vlen) | |
except: | |
return None | |
if len(k) != klen or len(v) != vlen: | |
return None | |
return Pair(k,v) | |
def tofile(self, f): | |
"""Unparse this pythonic key-value pair object into | |
a file for interchanged as defined by the specification.""" | |
k, v = self.key, self.value | |
assert len(k) > 0, 'key length cannot be zero' | |
assert isbinaryfile(f, 'w'), 'unwritable binary file' | |
f.write(bytes((len(k)-1,)) + self.key) | |
f.write(len(v).to_bytes(5, 'big') + self.value) | |
class KVP: | |
def __init__(self, f): | |
"Write or Read a key-value pair file based on the file given." | |
self._f = f | |
self.readable = lambda: isbinaryfile(f, 'r') | |
self.writable = lambda: isbinaryfile(f, 'w') | |
if self.readable(): | |
if self._f.read(4) != b'KVP\xff': | |
raise SyntaxError('invalid file ID') | |
elif self.writable(): | |
self._f.write(b'KVP\xff') | |
def __lshift__(self, p): | |
if not self.writable(): | |
raise io.UnsupportedOperation() | |
if not isinstance(p, Pair): | |
raise TypeError('not a key-value Pair type') | |
p.tofile(self._f) | |
def __iter__(self): | |
if not self.readable(): | |
raise io.UnsupportedOperation() | |
return iter(partial(Pair.fromfile, self._f), None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code is released under GPL v2.0.