Last active
September 13, 2018 19:51
-
-
Save YetAnotherMinion/f8b452901b99da7cedc730459ef2c5fe to your computer and use it in GitHub Desktop.
key value db example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
from typing import Dict | |
from hypothesis import given, assume, reproduce_failure | |
import hypothesis.strategies as st | |
import unittest | |
import logging | |
class KeyValueStore: | |
def __init__(self): | |
self._storage = {} | |
def get(self, key: str): | |
logging.debug(f"KeyValueStore::get('{key}')") | |
return self._storage[key] | |
def put(self, values: Dict[str, str]): | |
logging.debug(f"KeyValueStore::put({values})") | |
logging.debug(f"KeyValueStore::_storage={self._storage}") | |
# TODO lock all the keys for a atomic transaction | |
self._storage.update(values) | |
class KeyValueHandle: | |
def __init__(self, store=None): | |
if store is None: | |
logging.debug("KeyValueHandle creates new empty KeyValueStore") | |
store = KeyValueStore() | |
self._store = store | |
self._transactions = [] | |
def get(self, key: str): | |
logging.debug(f"KeyValueHandle::get('{key}')") | |
logging.debug(f"KeyValueHandle::_transactions={self._transactions}") | |
if self._transactions: | |
for i in range(len(self._transactions) , 0, -1): | |
i -= 1 | |
if key in self._transactions[i]: | |
return self._transactions[i][key] | |
return self._store.get(key) | |
def put(self, key: str, value: str): | |
logging.debug(f"KeyValueHandle::put('{key}', '{value}')") | |
logging.debug(f"KeyValueHandle::_transactions={self._transactions}") | |
if len(self._transactions) == 0: | |
# If there are no transactions, just write the value to the store immediately | |
return self._store.put({key: value}) | |
else: | |
self._transactions[-1][key] = value | |
def begin_transaction(self): | |
logging.debug(f"KeyValueHandle::begin_transaction()") | |
logging.debug(f"KeyValueHandle::_transactions={self._transactions}") | |
self._transactions.append({}) | |
def commit_transaction(self): | |
logging.debug(f"KeyValueHandle::commit_transaction()") | |
logging.debug(f"KeyValueHandle::_transactions={self._transactions}") | |
# commits all open transactions at once to store | |
if len(self._transactions) == 0: | |
# TODO figure out which exception to raise | |
raise IndexError | |
# reuse the storage from the outermost transaction to hold the result | |
# of all nested transactions | |
result = self._transactions[0] | |
# walk from innermost transanaction to outermost transaction, | |
# overwriting values in outer transaction with inner transactions | |
for i in range(1, len(self._transactions)): | |
layer = self._transactions[i] | |
for key in layer: | |
result[key] = layer[key] | |
self._store.put(result) | |
self._transactions = [] | |
def rollback_transaction(self): | |
logging.debug(f"KeyValueHandle::rollback_transaction()") | |
logging.debug(f"KeyValueHandle::_transactions={self._transactions}") | |
# will raise IndexError # TODO figure out which exception to raise instead of IndexError | |
self._transactions.pop() | |
class TestKeyValueHandle(unittest.TestCase): | |
@given(key=st.text(), value=st.text()) | |
def test_rollback_transaction(self, key, value): | |
handle = KeyValueHandle() | |
handle.begin_transaction() | |
handle.put(key, value) | |
self.assertEqual(value, handle.get(key)) | |
handle.rollback_transaction() | |
self.assertRaises(KeyError, handle.get, key) | |
@given(key=st.text(), value=st.text()) | |
def test_commit_transaction(self, key, value): | |
handle = KeyValueHandle() | |
handle.begin_transaction() | |
handle.put(key, value) | |
handle.commit_transaction() | |
self.assertEqual(value, handle.get(key)) | |
@given(key=st.text(), key2=st.text(), v1=st.text(), v2=st.text(), v3=st.text(), v4=st.text(), v9=st.text()) | |
def test_nested_transaction(self, key, key2, v1, v2, v3, v4, v9): | |
assume(key != key2) | |
handle = KeyValueHandle() | |
handle.begin_transaction() | |
handle.put(key, v1) | |
handle.begin_transaction() | |
handle.put(key, v2) | |
handle.put(key, v3) | |
handle.put(key, v4) | |
handle.put(key2, v9) | |
handle.rollback_transaction() | |
self.assertRaises(KeyError, handle.get, key2) | |
self.assertEqual(v1, handle.get(key)) | |
handle.rollback_transaction() | |
self.assertRaises(KeyError, handle.get, key) | |
@given(key=st.text(), v1=st.text(), v2=st.text()) | |
def test_rollback_outside_transaction(self, key, v1, v2): | |
handle = KeyValueHandle() | |
handle.begin_transaction() | |
handle.put(key, v1) | |
handle.begin_transaction() | |
handle.put(key, v2) | |
handle.commit_transaction() | |
self.assertEqual(v2, handle.get(key)) | |
self.assertRaises(IndexError, handle.rollback_transaction) | |
#@given(key=st.text(), v1=st.text(), v2=st.text(), v3=st.text()) | |
#def test_rollback_outside_transaction(self, key, v1, v2): | |
# handle = KeyValueHandle() | |
# handle.begin_transaction() | |
# handle.put(key, v1) | |
# handle.begin_transaction() | |
# handle.put(key, v2) | |
# handle.commit_transaction() | |
# self.assertEqual(v2, handle.get(key)) | |
# self.assertRaises(IndexError, handle.rollback_transaction) | |
if __name__ == "__main__": | |
logging.getLogger().setLevel(logging.DEBUG) | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment