Skip to content

Instantly share code, notes, and snippets.

@YetAnotherMinion
Last active September 13, 2018 19:51
Show Gist options
  • Save YetAnotherMinion/f8b452901b99da7cedc730459ef2c5fe to your computer and use it in GitHub Desktop.
Save YetAnotherMinion/f8b452901b99da7cedc730459ef2c5fe to your computer and use it in GitHub Desktop.
key value db example
#! /usr/bin/env python3
from typing import Dict
from hypothesis import given, assume, reproduce_failure
import hypothesis.strategies as st
import unittest
import logging
class KeyValueStore:
def __init__(self):
self._storage = {}
def get(self, key: str):
logging.debug(f"KeyValueStore::get('{key}')")
return self._storage[key]
def put(self, values: Dict[str, str]):
logging.debug(f"KeyValueStore::put({values})")
logging.debug(f"KeyValueStore::_storage={self._storage}")
# TODO lock all the keys for a atomic transaction
self._storage.update(values)
class KeyValueHandle:
def __init__(self, store=None):
if store is None:
logging.debug("KeyValueHandle creates new empty KeyValueStore")
store = KeyValueStore()
self._store = store
self._transactions = []
def get(self, key: str):
logging.debug(f"KeyValueHandle::get('{key}')")
logging.debug(f"KeyValueHandle::_transactions={self._transactions}")
if self._transactions:
for i in range(len(self._transactions) , 0, -1):
i -= 1
if key in self._transactions[i]:
return self._transactions[i][key]
return self._store.get(key)
def put(self, key: str, value: str):
logging.debug(f"KeyValueHandle::put('{key}', '{value}')")
logging.debug(f"KeyValueHandle::_transactions={self._transactions}")
if len(self._transactions) == 0:
# If there are no transactions, just write the value to the store immediately
return self._store.put({key: value})
else:
self._transactions[-1][key] = value
def begin_transaction(self):
logging.debug(f"KeyValueHandle::begin_transaction()")
logging.debug(f"KeyValueHandle::_transactions={self._transactions}")
self._transactions.append({})
def commit_transaction(self):
logging.debug(f"KeyValueHandle::commit_transaction()")
logging.debug(f"KeyValueHandle::_transactions={self._transactions}")
# commits all open transactions at once to store
if len(self._transactions) == 0:
# TODO figure out which exception to raise
raise IndexError
# reuse the storage from the outermost transaction to hold the result
# of all nested transactions
result = self._transactions[0]
# walk from innermost transanaction to outermost transaction,
# overwriting values in outer transaction with inner transactions
for i in range(1, len(self._transactions)):
layer = self._transactions[i]
for key in layer:
result[key] = layer[key]
self._store.put(result)
self._transactions = []
def rollback_transaction(self):
logging.debug(f"KeyValueHandle::rollback_transaction()")
logging.debug(f"KeyValueHandle::_transactions={self._transactions}")
# will raise IndexError # TODO figure out which exception to raise instead of IndexError
self._transactions.pop()
class TestKeyValueHandle(unittest.TestCase):
@given(key=st.text(), value=st.text())
def test_rollback_transaction(self, key, value):
handle = KeyValueHandle()
handle.begin_transaction()
handle.put(key, value)
self.assertEqual(value, handle.get(key))
handle.rollback_transaction()
self.assertRaises(KeyError, handle.get, key)
@given(key=st.text(), value=st.text())
def test_commit_transaction(self, key, value):
handle = KeyValueHandle()
handle.begin_transaction()
handle.put(key, value)
handle.commit_transaction()
self.assertEqual(value, handle.get(key))
@given(key=st.text(), key2=st.text(), v1=st.text(), v2=st.text(), v3=st.text(), v4=st.text(), v9=st.text())
def test_nested_transaction(self, key, key2, v1, v2, v3, v4, v9):
assume(key != key2)
handle = KeyValueHandle()
handle.begin_transaction()
handle.put(key, v1)
handle.begin_transaction()
handle.put(key, v2)
handle.put(key, v3)
handle.put(key, v4)
handle.put(key2, v9)
handle.rollback_transaction()
self.assertRaises(KeyError, handle.get, key2)
self.assertEqual(v1, handle.get(key))
handle.rollback_transaction()
self.assertRaises(KeyError, handle.get, key)
@given(key=st.text(), v1=st.text(), v2=st.text())
def test_rollback_outside_transaction(self, key, v1, v2):
handle = KeyValueHandle()
handle.begin_transaction()
handle.put(key, v1)
handle.begin_transaction()
handle.put(key, v2)
handle.commit_transaction()
self.assertEqual(v2, handle.get(key))
self.assertRaises(IndexError, handle.rollback_transaction)
#@given(key=st.text(), v1=st.text(), v2=st.text(), v3=st.text())
#def test_rollback_outside_transaction(self, key, v1, v2):
# handle = KeyValueHandle()
# handle.begin_transaction()
# handle.put(key, v1)
# handle.begin_transaction()
# handle.put(key, v2)
# handle.commit_transaction()
# self.assertEqual(v2, handle.get(key))
# self.assertRaises(IndexError, handle.rollback_transaction)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.DEBUG)
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment