Last active
April 17, 2020 16:47
-
-
Save Tishka17/5bad4969a388b165efee2b874b8b9544 to your computer and use it in GitHub Desktop.
Transactional DB
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class NullType: | |
def __repr__(self): | |
return 'NULL' | |
NULL = NullType() | |
class Db: | |
def __init__(self): | |
self.storage = {} | |
self.checkpoints = [] | |
def set(self, var, value): | |
if self.checkpoints: | |
self.checkpoints[-1][var] = value | |
else: | |
self.storage[var] = value | |
def unset(self, var): | |
if self.checkpoints: | |
self.checkpoints[-1][var] = None | |
else: | |
del self.storage[var] | |
def get(self, var): | |
res = self._get(var) | |
if res is None: | |
return NULL | |
return res | |
def _get(self, var): | |
for storage in reversed(self.checkpoints): | |
if var in storage: | |
return storage[var] | |
return self.storage.get(var) | |
def count(self, value): | |
total = 0 | |
visited = set() | |
for storage in reversed(self.checkpoints): | |
total += self._count(value, storage, visited) | |
total += self._count(value, self.storage, visited) | |
return total | |
def _count(self, value, storage, visited): | |
count = 0 | |
for k, v in storage.items(): | |
if k in visited: | |
continue | |
visited.add(k) | |
if v == value: | |
count += 1 | |
return count | |
def begin(self): | |
self.checkpoints.append({}) | |
def rollback(self): | |
del self.checkpoints[-1] | |
def commit(self): | |
for storage in self.checkpoints: | |
for k, v in storage.items(): | |
if v is None: | |
del self.storage[k] | |
else: | |
self.storage[k] = v |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from core import Db | |
from parser import Command, parse_input, UnknownCommand | |
def step(command_mapping): | |
input_data = input(">>> ") | |
try: | |
cmd, params = parse_input(input_data) | |
res = command_mapping[cmd](*params) | |
if res is not None: | |
print(res) | |
except TypeError: | |
print("Invalid argument count") | |
except UnknownCommand: | |
print("Unknown command") | |
def end(): | |
raise EOFError | |
def main(): | |
db = Db() | |
commands = { | |
Command.END: end, | |
Command.BEGIN: db.begin, | |
Command.ROLLBACK: db.rollback, | |
Command.COMMIT: db.commit, | |
Command.GET: db.get, | |
Command.SET: db.set, | |
Command.UNSET: db.unset, | |
Command.COUNTS: db.count, | |
} | |
try: | |
while True: | |
step(commands) | |
except EOFError: | |
pass | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
class UnknownCommand(Exception): | |
pass | |
class Command(Enum): | |
SET = "SET" | |
GET = "GET" | |
UNSET = "UNSET" | |
COUNTS = "COUNTS" | |
BEGIN = "BEGIN" | |
ROLLBACK = "ROLLBACK" | |
COMMIT = "COMMIT" | |
END = "END" | |
def parse_input(input_data: str): | |
cmd_name, *params = input_data.split(maxsplit=2) | |
try: | |
cmd = Command(cmd_name.upper()) | |
except ValueError as e: | |
raise UnknownCommand from e | |
return cmd, params |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment