Last active
December 3, 2015 17:32
-
-
Save drozzy/0f1ca91f78b8ab8b4e72 to your computer and use it in GitHub Desktop.
Event system based on git model
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
from uuid import uuid4 | |
class Event(object): | |
def __init__(self, id_, value): | |
self.id_ = id_ | |
self.value = value | |
class Aggregate(object): | |
""" | |
Abstract class. | |
""" | |
def __init__(self): | |
self.remotes = {} | |
self.events = [] | |
def add(self, events): | |
"""Add an event to local aggregate's history, | |
this will in turn call commit""" | |
def add_event(event): | |
self.events.append(Event(uuid4(), event)) | |
self.commit(event) | |
if isinstance(events, list): | |
for event in events: | |
add_event(event) | |
else: | |
add_event(events) | |
def add_remote(self, remote): | |
self.remotes[remote] = None # last known id of the event for the given remote agg | |
def commit(self, event): | |
""" | |
(Abstract) Apply the state of the event to the current aggregate""" | |
pass | |
def merge(self, event): | |
""" | |
(Abstract) Handle an event coming from a different aggregate | |
""" | |
pass | |
def fetch(self, remotes): | |
"""Retrieve all the events from the remote aggregates, | |
and return them""" | |
ret = {} | |
for remote in remotes: | |
assert not isinstance(remote, list) | |
assert isinstance(remote, Aggregate) | |
if remote not in self.remotes: | |
self.remotes[remote] = None | |
ret[remote] = remote.log(self.remotes[remote]) | |
return ret | |
def pull(self, remotes): | |
"""Retrieve all the events from the remote aggregates, | |
and call merge on them""" | |
assert isinstance(remotes, list) | |
"""{remote: evts}""" | |
for remote, evts in self.fetch(remotes).iteritems(): | |
assert isinstance(remote, Aggregate) | |
evts = list(evts) | |
for evt in evts: | |
self.merge(evt.value) | |
self.remotes[remote] = evt.id_ | |
def log(self, id_=None): | |
"""Return all the events since the given sha, or all if id_ is None""" | |
if id_ is None: | |
print("id_ is none") | |
print("Self events is: %s" % self.events) | |
return reversed(self.events) | |
else: | |
found = itertools.dropwhile(lambda x: x.id_!=id_, self.events) | |
found.next() | |
return list(found) | |
def refresh(self): | |
"""Fake dynamic system""" | |
for remote in self.remotes: | |
remote.refresh() | |
self.tick() | |
self.pull(self.remotes.keys()) | |
def tick(self): | |
""" | |
(Abstract) Perform any steps necessary (fake asynchrony) | |
""" | |
pass | |
class DirectoryServiceEvent(object): | |
pass | |
class FileCreated(DirectoryServiceEvent): | |
def __init__(self, filename): | |
self.filename = filename | |
class FileRemoved(DirectoryServiceEvent): | |
def __init__(self, filename): | |
self.filename = filename | |
import os, time | |
class DirectoryService(Aggregate): | |
"""Sample aggregate that scans the current directory for file changes and publishes appropriate events""" | |
def __init__(self): | |
super(DirectoryService, self).__init__() | |
self.files = [] | |
def tick(self): | |
self.scan() | |
def scan(self): | |
path_to_watch = "." | |
before = dict( [(f, None) for f in self.files]) | |
after = dict ( [(f, None) for f in os.listdir (path_to_watch)]) | |
added = [FileCreated(f) for f in after if not f in before] | |
removed = [FileRemoved(f) for f in before if not f in after] | |
self.add(added) | |
self.add(removed) | |
def commit(self, event): | |
if isinstance(event, FileCreated): | |
self.files.append(event.filename) | |
elif isinstance(event, FileRemoved): | |
if event.filename in self.files: | |
self.files.remove(event.filename) | |
class CollectionEvent(object): | |
pass | |
class FileChanged(CollectionEvent): | |
def __init__(self, filename, appeared): | |
self.filename = filename | |
self.appeared = appeared | |
class FileRemovedFromCollection(CollectionEvent): | |
def __init__(self, filename, collection): | |
self.filename = filename | |
self.collection = collection | |
class FileAddedToCollection(CollectionEvent): | |
def __init__(self, filename, collection): | |
self.filename = filename | |
self.collection = collection | |
class CollectionService(Aggregate): | |
""" | |
File can belong to a few collections. | |
If a file dissapears, remove it from a collection automatically. | |
""" | |
def __init__(self, directory_service): | |
super(CollectionService, self).__init__() | |
# Collections and the files they contain | |
self.collections = {} | |
# Reverse map - points which collection the file is in | |
self.files = {} | |
self.valid_filenames = [] | |
self.directory_service = directory_service | |
self.add_remote(directory_service) | |
def merge(self, event): | |
if isinstance(event, FileRemoved): | |
self.add(FileChanged(event.filename, False)) | |
if event.filename in self.files: | |
collections_in = list(self.files[event.filename]) | |
for collection_in in collections_in: | |
self.add(FileRemovedFromCollection(event.filename, collection_in)) | |
elif isinstance(event, FileCreated): | |
self.add(FileChanged(event.filename, True)) | |
print("Got update from remote: %s" % event) | |
print(self) | |
def __str__(self): | |
return "%s\n%s" % (self.valid_filenames, self.collections) | |
def commit(self, event): | |
print("Commiting event: %s " % event) | |
if isinstance(event, FileRemovedFromCollection): | |
col = event.collection | |
fname = event.filename | |
if col in self.files[fname]: | |
self.files[fname].remove(col) | |
if fname in self.collections[col]: | |
self.collections[col].remove(fname) | |
elif isinstance(event, FileAddedToCollection): | |
if event.collection not in self.collections: | |
self.collections[event.collection] = [] | |
if event.filename not in self.collections[event.collection]: | |
self.collections[event.collection].append(event.filename) | |
if event.filename not in self.files: | |
self.files[event.filename] = [] | |
if event.collection not in self.files[event.filename]: | |
self.files[event.filename].append(event.collection) | |
elif isinstance(event, FileChanged): | |
if event.appeared: | |
self.valid_filenames.append(event.filename) | |
else: | |
if event.filename in self.valid_filenames: | |
self.valid_filenames.remove(event.filename) | |
print(self) | |
def remove_from_collection(self, file, collection): | |
if file in self.files: | |
collections_in = list(self.files[file]) | |
for collection_in in collections_in: | |
self.add(FileRemovedFromCollection(file, collection_in)) | |
def put_into_collection(self, file, collection): | |
if file not in self.valid_filenames: | |
print("[ERROR] File %s does not seem to exist... Try again later." % file) | |
return | |
if collection not in self.collections or file not in self.collections[collection]: | |
self.add(FileAddedToCollection(file, collection)) | |
else: | |
pass | |
def main(): | |
"""Do a sample run of the sytem - you should be able to add/remove files from current | |
directory and see events be generated""" | |
ds = DirectoryService() | |
print("Before scan...") | |
print(ds.files) | |
ds.scan() | |
print("Pulling...") | |
for event in ds.log(): | |
print("%s the file: %s" % (repr(event), event.value.filename)) | |
print("After scan...") | |
print(ds.files) | |
print("Starting collection service") | |
cs = CollectionService(ds) | |
print("Putting file 'sample.py' into 'My' collection") | |
cs.put_into_collection('sample.py', 'My') | |
print("Current collection-files are:") | |
print(cs.collections) | |
cs.put_into_collection('sample1.py', 'My') | |
print("Current collection-files are:") | |
print(cs.collections) | |
import time | |
while True: | |
time.sleep(1) | |
cs.refresh() | |
cs.put_into_collection('baker.txt', 'My') | |
time.sleep(1) | |
cs.remove_from_collection('baker.txt', 'My') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment