Last active
June 19, 2023 17:09
-
-
Save TheArcherST/71bad07a98b363960f0b160aa647e70a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Event generation system | |
Assumes that we have some stream of data, and we want to synchronize | |
it with remote server with minimum network stress. Chosen solution is | |
to convert data stream to stream of data change events. This module | |
describes base units of convert system. | |
Specific features: | |
1. sources of the data are not eternal. when source expires, we must | |
accept the latest revision result. | |
Supported events: | |
1. CREATE (new record appeared in the source) | |
2. DELETE (record disappeared from the source) | |
3. PING (informational event. proof that record steel exists in the source) | |
Delete requests is producing only after accepting second revision of the | |
records source. | |
""" | |
from __future__ import annotations | |
import asyncio | |
import itertools | |
import random | |
from typing import Protocol, Type, Optional, Union | |
from enum import Enum | |
from dataclasses import dataclass | |
from hashlib import md5 | |
# ##### protocol for implementing specific data and source ##### | |
class Record(Protocol): | |
"""DataRecord object | |
A record, extracted from some records source. | |
""" | |
def hash(self) -> str: | |
"""Hash method | |
Returns identifier, based on Record's data | |
""" | |
class SourceExpired(Exception): | |
pass | |
class RecordsSource(Protocol): | |
def hash(self) -> str: | |
"""Hash method | |
Returns identifier of the records source | |
:returns: | |
string | |
""" | |
async def __anext__(self) -> Record: | |
"""Next method | |
Returns next fetched record. Can perform IO requests. But there is | |
an option to load all data during `load`, if loading all data in | |
one request is better strategy. | |
:raise StopIteration: | |
on records run out. | |
:raise Expired: | |
on source expiration. means that it's no more available. must | |
be accepted last revision. | |
:returns: | |
Awaitable[Record] | |
""" | |
async def load(self, *, reload=False): | |
"""Load method | |
Load the records source. It includes loading of some meta information | |
about records source. Optionally, can be loaded all records in the | |
source. This option is described in `__anext__` method's doc. | |
:raise Expired: | |
on source expiration. means that it's no more available. must be | |
accepted last revision. | |
:returns: | |
Awaitable[NoneType] | |
""" | |
def __aiter__(self): | |
return self | |
# ##### protocol for simple storage ##### | |
class Storage(Protocol): | |
def set_name(self, name: str): ... | |
async def save(self, data: Union[dict, list]): ... | |
async def load(self, default: Union[dict, list] = None) -> Union[dict, list]: ... | |
# ##### events generation implementation ##### | |
class EventType(Enum): | |
CREATE = 'CREATE' | |
DELETE = 'DELETE' | |
PING = 'PING' | |
@dataclass | |
class Event: | |
type: EventType | |
record: Record = None | |
record_hash: str = None | |
class EventStream: | |
"""Event stream object | |
Wraps the record source into an event stream. | |
""" | |
def __init__(self, | |
storage: Storage, | |
records_source: RecordsSource): | |
self._storage = storage | |
self._records_source = records_source | |
self._last_revision_cache = set() # to identify lost | |
async def _resolve_next_lost_record(self) -> Optional[str]: | |
data: list[str] = await self._storage.load([]) | |
lost = set(data) - self._last_revision_cache | |
if not lost: | |
result = None | |
else: | |
result = lost.pop() | |
data.remove(result) | |
await self._storage.save(data) | |
return result | |
async def _get_event_type( | |
self, | |
record_hash: str, | |
) -> EventType: | |
data: list[str] = await self._storage.load([]) | |
if record_hash in data: | |
result = EventType.PING | |
else: | |
result = EventType.CREATE | |
data.append(record_hash) | |
await self._storage.save(data) | |
return result | |
async def __anext__(self) -> Event: | |
try: | |
current_record = await anext(self._records_source) | |
except RuntimeError: | |
if perform_delete_for := await self._resolve_next_lost_record(): | |
event = Event(EventType.DELETE, record_hash=perform_delete_for) | |
else: | |
await self._records_source.load(reload=True) | |
self._last_revision_cache.clear() | |
return await self.__anext__() | |
else: | |
record_hash = current_record.hash() | |
self._last_revision_cache.add(record_hash) | |
event_type = await self._get_event_type(record_hash) | |
event = Event(event_type, current_record, record_hash) | |
return event | |
def __aiter__(self): | |
return self | |
class EventStreamsManager: | |
def __init__(self, | |
storage_type: Type[Storage]): | |
self._storage_type = storage_type | |
self._record_sources = [] | |
self._event_streams = [] | |
self._event_streams_iterator = iter([]) | |
def add_source(self, source: RecordsSource): | |
self._record_sources.append(source) | |
self._event_streams.append(self._build_event_stream(self._storage_type, source)) | |
self._event_streams_iterator = itertools.cycle(self._event_streams) | |
@staticmethod | |
def _build_event_stream(storage_type: Type[Storage], source: RecordsSource): | |
exact_storage = storage_type() | |
exact_storage.set_name(source.hash()) | |
return EventStream(exact_storage, source) | |
async def __anext__(self): | |
current_stream = next(self._event_streams_iterator) | |
try: | |
result = await anext(current_stream) | |
except SourceExpired: | |
# todo: implement source remove | |
return self.__anext__() | |
return result | |
def __aiter__(self): | |
return self | |
# ##### example ##### | |
@dataclass | |
class MyRecord(Record): | |
name: str | |
def hash(self): | |
return md5(self.name.encode()).hexdigest() | |
class MyRecordsSource(RecordsSource): | |
def __init__(self, name: str, data): | |
self._data = [] | |
self._scr = data | |
self._current_index = 0 | |
self._name = name | |
async def load(self, *, reload=False): | |
self._data = self._scr.copy() | |
self._current_index = 0 | |
async def __anext__(self) -> Record: | |
if self._current_index >= len(self._data): | |
raise StopIteration | |
result = self._data[self._current_index] | |
self._current_index += 1 | |
return MyRecord(result) | |
def __hash__(self): | |
return hash(self._name) | |
class MyStorage(Storage): | |
def __init__(self): | |
self._data = None | |
self._name = None | |
def set_name(self, name: str): | |
self._name = name | |
async def load(self, default: Union[dict, list] = None) -> Union[dict, list]: | |
return self._data or default | |
async def save(self, data: Union[dict, list]): | |
self._data = data | |
async def main(): | |
src1 = MyRecordsSource("test1", ["1", "2", "3"]) | |
src2 = MyRecordsSource("test2", ["10", "20", "30"]) | |
manager = EventStreamsManager(MyStorage) | |
manager.add_source(src1) | |
manager.add_source(src2) | |
async for i in manager: | |
print(i) | |
# emulate realworld changes | |
src1._data.append(str(random.randint(0, 9))) | |
src2._data.append(str(random.randint(0, 9))) | |
src1._data.pop(random.randint(0, len(src1._data)-1)) | |
src2._data.pop(random.randint(0, len(src2._data)-1)) | |
await asyncio.sleep(0.5) | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment