Last active
February 5, 2024 21:48
-
-
Save lemon24/558955ad82ba2e4f50c0184c630c668c to your computer and use it in GitHub Desktop.
Using model-based testing on a search sync implementation for https://github.com/lemon24/reader/issues/323
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
...in which we use property-based / model-based testing to find | |
an eventually consistent way of synchronizing entries | |
from the main reader storage to an (unrelated) search index, | |
without coupling the storage and search with transactions / triggers. | |
A real model checker like TLA+ or Alloy may have been a better tool for this, | |
but I don't know how to use them. | |
This is the first step in https://github.com/lemon24/reader/issues/323. | |
--- | |
Note that even the current implementation does not update the search index | |
inside a trigger / transaction, because this would keep the database | |
write-locked for too long. | |
Instead, in triggers[1] we only record which entries need to be updated. | |
In a later search update step, we pre-process entries (e.g. strip HTML) | |
outside of a transaction, and only use transactions to atomically: | |
1. write the entry to the search index | |
2. clear the "needs to be updated" flag | |
3. ensure entries were not updated during processing; | |
we use entry.ts for this[2], but the entry hash[3] or | |
a random uniquely associated with the entry version would work too | |
In a world where the storage and the search index are separate systems, | |
1-2-3 above cannot happen atomically (hence, this). | |
There are three events when an entry needs to be synced: | |
* update – the entry was added/updated to the storage; | |
also used when the title of a feed changes (not modeled here) | |
* delete – the entry was deleted from the storage | |
* move – the entry moved from one feed to another | |
(change_feed_url()), but its content has not changed; | |
equivalent to delete+update, and is only an optimization | |
The property we are testing for is that after an arbitrary number | |
of entry changes and (possibly concurrent / partial) syncs, | |
a final sync will make the storage and search index match. | |
--- | |
Assuming that (1) Sync models things correctly and | |
(2) there are no exceedingly rare bugs Hypothesis couldn't find, | |
a correct solution looks like this: | |
* Each entry has a "sequence" that changes every time the entry changes | |
in a way the search index cares about. The sequence can be | |
a global counter, a random number, or a timestamp; | |
the only requirement is that it won't be used again | |
(or it is extremely unlikely that will happen). | |
* Each sequence change gets recorded. For changes to an existing entry, | |
an update+delete pair is recorded, with the old/new sequence, respectively. | |
* Some time later, search processes pending changes and deletes them. | |
For update, search checks the change sequence against the entry sequence; | |
if the sequences do not match, the change is ignored | |
(deleted without an update). | |
--- | |
Prior versions of this that modeled storage and search in separate classes[4] | |
supported the move event. I decided not having it is acceptable, | |
since it produces the same amount of (re-)processing as feed title changes, | |
which right now aren't optimized in any way. | |
[1]: https://github.com/lemon24/reader/blob/3.11/src/reader/_search.py#L247-L404 | |
[2]: https://github.com/lemon24/reader/blob/3.11/src/reader/_search.py#L695-L707 | |
[3]: https://github.com/lemon24/reader/blob/3.11/src/reader/_types.py#L166-L168 | |
[4]: https://gist.github.com/lemon24/558955ad82ba2e4f50c0184c630c668c/e3da12e7b16a4f4bf63ed0f81b86f5ea1f2b42cd | |
""" | |
import time | |
import random | |
from typing import NamedTuple | |
import pytest | |
import hypothesis.strategies as st | |
from hypothesis import * | |
from hypothesis.stateful import * | |
class Entry(NamedTuple): | |
text: str | |
seq: int | |
_seq = 1000 | |
def next_seq(): | |
global _seq | |
_seq += 1 | |
return _seq | |
class Sync(RuleBasedStateMachine): | |
"""Model of things that can happen. | |
See https://hypothesis.readthedocs.io/en/latest/stateful.html | |
""" | |
def __init__(self): | |
super().__init__() | |
self.storage = {} | |
self.changes = {} | |
self.search = {} | |
@precondition(lambda self: not self.changes) | |
@invariant() | |
def in_sync(self): | |
"""The property: when all the changes have been processed, | |
the storage and the search index match. | |
""" | |
storage = [ | |
(feed, id, entry.text) | |
for feed, entries in self.storage.items() | |
for id, entry in entries.items() | |
] | |
storage.sort() | |
search = [ | |
(feed, id, text) | |
for (feed, id, _), text in self.search.items() | |
] | |
search.sort() | |
assert storage == search, str(self) | |
# storage methods | |
feeds = Bundle("feeds") | |
@rule(target=feeds, feed=st.integers(0, 3)) | |
def add_feed(self, feed): | |
assume(feed not in self.storage) | |
self.storage[feed] = {} | |
return feed | |
@rule(feed=feeds, id=st.integers(0, 2), text=st.sampled_from('abc')) | |
def add_or_update_entry(self, feed, id, text, seq=None): | |
if seq is None: | |
seq = next_seq() | |
try: | |
old = self.storage[feed][id] | |
except KeyError: | |
old = None | |
self.storage[feed][id] = Entry(text, seq) | |
self.changes[feed, id, seq] = 'update' | |
if old: | |
self.changes[feed, id, old.seq] = 'delete' | |
@rule(feed=feeds, data=st.data()) | |
def delete_entry(self, feed, data, *, id=None): | |
entries = self.storage.get(feed) | |
assume(entries) | |
if id is None: | |
id = data.draw(st.sampled_from(list(entries)), 'entry') | |
entry = entries.pop(id) | |
self.changes[feed, id, entry.seq] = 'delete' | |
@rule(feed=consumes(feeds)) | |
def delete_feed(self, feed): | |
entries = self.storage.pop(feed) | |
if not entries: | |
return | |
for id, entry in entries.items(): | |
self.changes[feed, id, entry.seq] = 'delete' | |
@rule(target=feeds, old=consumes(feeds), new=st.integers(0, 3)) | |
def change_feed_url(self, old, new): | |
assume(new not in self.storage) | |
old_entries = self.storage.pop(old) | |
new_entries = self.storage[new] = {} | |
for id, old_entry in old_entries.items(): | |
new_entry = new_entries[id] = Entry(old_entry.text, next_seq()) | |
self.changes[new, id, new_entry.seq] = 'update' | |
self.changes[old, id, old_entry.seq] = 'delete' | |
return new | |
# search methods | |
@precondition(lambda self: self.changes) | |
@rule(random=st.randoms()) | |
def update(self, random=random): | |
changes = list(self.changes.items()) | |
random.shuffle(changes) | |
for change in changes: | |
self.do_change(change) | |
self.change_done(change) | |
# model interleaved updates to simulate concurrent synchronizations (?) | |
todo_changes = Bundle("todo_changes") | |
done_changes = Bundle("done_changes") | |
@precondition(lambda self: self.changes) | |
@rule(target=todo_changes, data=st.data()) | |
def get_change(self, data, *, key=None): | |
if key is not None: | |
return key, self.changes[key] | |
return data.draw(st.sampled_from(list(self.changes.items())), 'change') | |
@rule(target=done_changes, change=consumes(todo_changes)) | |
def do_change(self, change): | |
(feed, id, seq), action = change | |
getattr(self, 'do_' + action)(feed, id, seq) | |
return change | |
@rule(change=consumes(done_changes)) | |
def change_done(self, change): | |
key, action = change | |
if self.changes.get(key) == action: | |
del self.changes[key] | |
def do_update(self, feed, id, seq): | |
try: | |
entry = self.storage[feed][id] | |
except KeyError: | |
return | |
if entry.seq != seq: | |
return | |
self.search[feed, id, seq] = entry.text | |
def do_delete(self, feed, id, seq): | |
try: | |
del self.search[feed, id, seq] | |
except KeyError: | |
pass | |
def __str__(self): | |
return '\n'.join(self._str_parts()) | |
def _str_parts(self): | |
yield "---" | |
yield "storage:" | |
for feed, entries in sorted(self.storage.items()): | |
for id, entry in sorted(entries.items()): | |
yield f" {feed} {id} {entry.text!r:>6} {entry.seq}" | |
yield "search:" | |
for (feed, id, seq), text in sorted(self.search.items()): | |
yield f" {feed} {id} {text!r:>6} {seq}" | |
yield "changes:" | |
for (feed, id, seq), action in sorted(self.changes.items()): | |
yield f" {feed} {id} {action:>6} {seq}" | |
def test_late_change_of_reverted_move(): | |
"""Fixed by changing entry seq on change_feed_url().""" | |
state = Sync() | |
state.add_feed(feed=1) | |
state.add_or_update_entry(feed=1, id=0, text='a', seq=123) | |
state.update() | |
state.change_feed_url(new=0, old=1) | |
change = state.get_change(None, key=(1, 0, 123)) | |
state.change_feed_url(new=1, old=0) | |
state.update() | |
state.do_change(change=change) | |
# AssertionError: assert [(1, 0, 'a')] == [] | |
state.in_sync() | |
def test_late_change_of_old_update(): | |
"""Fixed by checking seq in do_delete().""" | |
state = Sync() | |
state.add_feed(feed=0) | |
state.add_or_update_entry(feed=0, id=0, text='a', seq=123) | |
change = state.get_change(None, key=(0, 0, 123)) | |
state.add_or_update_entry(feed=0, id=0, text='b', seq=456) | |
state.update() | |
state.do_change(change=change) | |
# AssertionError: assert [(0, 0, 'a')] == [(0, 0, 'a'), (0, 0, 'a')] | |
state.in_sync() | |
test_sync = Sync.TestCase | |
test_sync.settings = settings( | |
# verbosity=Verbosity.debug, | |
suppress_health_check=[HealthCheck.filter_too_much], | |
max_examples=1000, | |
deadline=1000 | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment