Skip to content

Instantly share code, notes, and snippets.

@Tishka17
Last active April 17, 2020 16:47
Show Gist options
  • Save Tishka17/5bad4969a388b165efee2b874b8b9544 to your computer and use it in GitHub Desktop.
Save Tishka17/5bad4969a388b165efee2b874b8b9544 to your computer and use it in GitHub Desktop.
Transactional DB
class NullType:
def __repr__(self):
return 'NULL'
NULL = NullType()
class Db:
def __init__(self):
self.storage = {}
self.checkpoints = []
def set(self, var, value):
if self.checkpoints:
self.checkpoints[-1][var] = value
else:
self.storage[var] = value
def unset(self, var):
if self.checkpoints:
self.checkpoints[-1][var] = None
else:
del self.storage[var]
def get(self, var):
res = self._get(var)
if res is None:
return NULL
return res
def _get(self, var):
for storage in reversed(self.checkpoints):
if var in storage:
return storage[var]
return self.storage.get(var)
def count(self, value):
total = 0
visited = set()
for storage in reversed(self.checkpoints):
total += self._count(value, storage, visited)
total += self._count(value, self.storage, visited)
return total
def _count(self, value, storage, visited):
count = 0
for k, v in storage.items():
if k in visited:
continue
visited.add(k)
if v == value:
count += 1
return count
def begin(self):
self.checkpoints.append({})
def rollback(self):
del self.checkpoints[-1]
def commit(self):
for storage in self.checkpoints:
for k, v in storage.items():
if v is None:
del self.storage[k]
else:
self.storage[k] = v
from core import Db
from parser import Command, parse_input, UnknownCommand
def step(command_mapping):
input_data = input(">>> ")
try:
cmd, params = parse_input(input_data)
res = command_mapping[cmd](*params)
if res is not None:
print(res)
except TypeError:
print("Invalid argument count")
except UnknownCommand:
print("Unknown command")
def end():
raise EOFError
def main():
db = Db()
commands = {
Command.END: end,
Command.BEGIN: db.begin,
Command.ROLLBACK: db.rollback,
Command.COMMIT: db.commit,
Command.GET: db.get,
Command.SET: db.set,
Command.UNSET: db.unset,
Command.COUNTS: db.count,
}
try:
while True:
step(commands)
except EOFError:
pass
if __name__ == "__main__":
main()
from enum import Enum
class UnknownCommand(Exception):
pass
class Command(Enum):
SET = "SET"
GET = "GET"
UNSET = "UNSET"
COUNTS = "COUNTS"
BEGIN = "BEGIN"
ROLLBACK = "ROLLBACK"
COMMIT = "COMMIT"
END = "END"
def parse_input(input_data: str):
cmd_name, *params = input_data.split(maxsplit=2)
try:
cmd = Command(cmd_name.upper())
except ValueError as e:
raise UnknownCommand from e
return cmd, params
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment