Last active
July 26, 2021 04:46
-
-
Save kjellm/ec8fbaac65a28d67f17d941cc454f0f1 to your computer and use it in GitHub Desktop.
Event Source proof of concept. Copyright 2017 Kjell-Magne Øierud. License: MIT https://opensource.org/licenses/MIT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'base' | |
require_relative 'event' | |
require_relative 'cmd' | |
require_relative 'crud' | |
require_relative 'model' | |
require_relative 'read' | |
require 'pp' | |
class Application < BaseObject | |
def initialize | |
@recording_id = UUID.generate | |
@release_id = UUID.generate | |
initialize_projections | |
end | |
def main | |
puts "LOGG ---------------------------------------------------------" | |
run_commands | |
puts | |
puts "EVENT STORE ------------------------------------------------" | |
pp registry.event_store | |
puts | |
puts "PROJECTIONS ------------------------------------------------" | |
peek_at_projections | |
end | |
private | |
def initialize_projections | |
@the_recording_projection = RecordingProjection.new | |
@the_release_projection = ReleaseProjection.new(@the_recording_projection) | |
@the_totals_projection = TotalsProjection.new | |
@projections = [ | |
@the_release_projection, | |
@the_recording_projection, | |
@the_totals_projection, | |
] | |
end | |
def peek_at_projections | |
p @the_release_projection.find @release_id | |
p @the_recording_projection.find @recording_id | |
p @the_totals_projection.totals | |
end | |
def run_commands | |
recording_data = {id: @recording_id, title: "Sledge Hammer", | |
artist: "Peter Gabriel", duration: 313} | |
run(recording_data, CreateRecording, Recording) | |
run({id: @release_id, title: "So", tracks: []}, | |
CreateRelease, Release) | |
run({id: UUID.generate, title: "Shaking The Tree", | |
tracks: [@recording_id]}, | |
CreateRelease, Release) | |
run({id: @release_id, title: "So", tracks: [@recording_id]}, | |
UpdateRelease, Release) | |
run(recording_data.merge({ title: "Sledgehammer" }), | |
UpdateRecording, Recording) | |
# Some failing commands, look in log for verification of failure | |
run({id: "Non-existing ID", title: "Foobar"}, | |
UpdateRecording, Recording) | |
end | |
def run(request_data, command_class, aggregate) | |
logg "Incoming request with data: #{request_data.inspect}" | |
command_handler = registry.command_handler_for(aggregate) | |
command = command_class.new(request_data) | |
command_handler.handle command | |
rescue StandardError => e | |
logg "ERROR: Command #{command} failed because of: #{e}" | |
end | |
end | |
Application.new.main |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'set' | |
require 'date' | |
require 'securerandom' | |
class String | |
def snake_case | |
split(/(?=[A-Z]+)/).map(&:downcase).join("_") | |
end | |
end | |
class Hash | |
def slice(*keys) | |
keys.each_with_object(self.class.new) do | |
|k, hash| hash[k] = self[k] if has_key?(k) | |
end | |
end | |
end | |
module UUID | |
def self.generate | |
SecureRandom.uuid | |
end | |
def self.as_int(uuid) | |
Integer(uuid.split("-").join, 16) | |
end | |
def self.from_int(int) | |
int.to_s(16).rjust(32, '0').split(/(\h{8})(\h{4})(\h{4})(\h{4})(\h{12})/)[1..-1].join("-") | |
end | |
end | |
module Attributes | |
def attributes(*names) | |
attr_reader(*names) | |
define_singleton_method(:attribute_names) { names } | |
mod = Module.new do | |
define_method :initialize do |**attrs| | |
attrs.each do |name, value| | |
raise ArgumentError.new "Unrecognized argument: #{name}" unless names.include? name | |
if respond_to? "#{name}=", true | |
send "#{name}=", value | |
else | |
instance_variable_set "@#{name}", value | |
end | |
end | |
super(**attrs) | |
end | |
end | |
include mod | |
names | |
end | |
end | |
class BaseObject | |
extend Attributes | |
def initialize(*_args) | |
end | |
module ClassAndInstanceMethods | |
def logg(*args) | |
print "#{DateTime.now} - ", *args | |
puts | |
end | |
def registry | |
@@registry ||= Registry.new | |
end | |
end | |
include ClassAndInstanceMethods | |
extend ClassAndInstanceMethods | |
def to_h | |
Hash[self.class.attribute_names.map {|name| [name, send(name)] }] | |
end | |
end | |
class Registry < BaseObject | |
def command_handler_for(klass) | |
handler = if klass.respond_to? :handle | |
klass | |
else | |
self.class.const_get("#{klass}CommandHandler").new | |
end | |
CommandHandlerLoggDecorator.new(handler) | |
end | |
def event_store | |
@event_store ||= | |
EventStoreOptimisticLockDecorator.new( | |
EventStoreLoggDecorator.new( | |
EventStorePubSubDecorator.new( | |
EventStore.new))) | |
end | |
def repository_for(klass) | |
if klass < CrudAggregate | |
klass | |
else | |
self.class.const_get("#{klass}Repository").new | |
end | |
end | |
end | |
class Entity < BaseObject | |
def set_attributes(attrs) | |
(self.class.attribute_names - [:id]).each do |name| | |
instance_variable_set(:"@#{name}", attrs[name]) if attrs.key?(name) | |
end | |
end | |
end | |
class ValueObject < BaseObject | |
end | |
class EventStoreError < StandardError | |
end | |
class EventStoreConcurrencyError < EventStoreError | |
end | |
class Event < ValueObject | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Validations | |
def required(*values) | |
values.none?(&:nil?) or | |
raise ArgumentError | |
end | |
def non_blank_string(obj) | |
return unless obj | |
obj.is_a?(String) && !obj.strip.empty? or | |
raise ArgumentError | |
end | |
def positive_integer(obj) | |
return unless obj | |
obj.is_a?(Integer) && obj > 0 or | |
raise ArgumentError | |
end | |
end | |
class Command < ValueObject | |
include Validations | |
def initialize(*args) | |
super | |
validate | |
end | |
def validate | |
raise "Implement in subclass! #{self.class.name}" | |
end | |
end | |
class CommandHandler < BaseObject | |
module InstanceMethods | |
def handle(command) | |
process(command) | |
return | |
end | |
def process(command) | |
message = "process_" + command.class.name.snake_case | |
send message.to_sym, command | |
end | |
end | |
include InstanceMethods | |
end | |
class CommandHandlerLoggDecorator < DelegateClass(CommandHandler) | |
def initialize(obj) | |
super obj | |
end | |
def handle(command) | |
logg "Start handling: #{command.inspect}" | |
super | |
ensure | |
logg "Done handling: #{command.class.name}" | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CrudCommandHandler < CommandHandler | |
module InstanceMethods | |
private | |
def validator(obj) | |
raise "Implement in subclass!" | |
end | |
def repository | |
raise "Implement in subclass!" | |
end | |
def type | |
raise "Implement in subclass!" | |
end | |
def process_create(command) | |
repository.unit_of_work(command.id) do |uow| | |
obj = type.new(command.to_h) | |
validator(obj).assert_validity | |
event = self.class.const_get("#{type}Created").new(command.to_h) | |
uow.create | |
uow.append event | |
end | |
end | |
def process_update(command) | |
repository.unit_of_work(command.id) do |uow| | |
obj = repository.find command.id | |
raise ArgumentError if obj.nil? | |
obj.set_attributes command.to_h | |
validator(obj).assert_validity | |
event = self.class.const_get("#{type}Updated").new(command.to_h) | |
uow.append event | |
end | |
end | |
end | |
include InstanceMethods | |
end | |
module CrudAggregate | |
module ClassMethods | |
def repository | |
self | |
end | |
def validator(obj) | |
obj | |
end | |
end | |
def assert_validity | |
end | |
def self.included(othermod) | |
othermod.extend CommandHandler::InstanceMethods | |
othermod.extend CrudCommandHandler::InstanceMethods | |
othermod.extend EventStoreRepository::InstanceMethods | |
othermod.extend ClassMethods | |
othermod_name = othermod.name.snake_case | |
othermod.define_singleton_method("type") { othermod } | |
othermod.define_singleton_method "process_create_#{othermod_name}" do |command| | |
process_create command | |
end | |
othermod.define_singleton_method "process_update_#{othermod_name}" do |command| | |
process_update command | |
end | |
othermod.define_singleton_method("apply_#{othermod_name}_updated") do |obj, event| | |
obj.set_attributes(event.to_h) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EventStream < BaseObject | |
def initialize | |
@event_sequence = [] | |
end | |
def version | |
@event_sequence.length | |
end | |
def append(*events) | |
@event_sequence.push(*events) | |
end | |
def to_a | |
@event_sequence.clone | |
end | |
end | |
class EventStore < BaseObject | |
def initialize | |
@streams = {} | |
end | |
def create(id) | |
raise EventStoreError, "Stream exists for #{id}" if @streams.key? id | |
@streams[id] = EventStream.new | |
end | |
def append(id, *events) | |
@streams.fetch(id).append(*events) | |
end | |
def event_stream_for(id) | |
@streams[id]&.clone | |
end | |
def event_stream_version_for(id) | |
@streams[id]&.version || 0 | |
end | |
end | |
class EventStoreOptimisticLockDecorator < DelegateClass(EventStore) | |
def initialize(obj) | |
super | |
@locks = {} | |
end | |
def create(id) | |
@locks[id] = Mutex.new | |
super | |
end | |
def append(id, expected_version, *events) | |
@locks[id].synchronize do | |
event_stream_version_for(id) == expected_version or | |
raise EventStoreConcurrencyError | |
super id, *events | |
end | |
end | |
end | |
class EventStorePubSubDecorator < DelegateClass(EventStore) | |
def initialize(obj) | |
super | |
@subscribers = [] | |
end | |
def add_subscriber(subscriber) | |
@subscribers << subscriber | |
end | |
def append(id, *events) | |
super | |
publish(*events) | |
end | |
def publish(*events) | |
events.each do |e| | |
@subscribers.each do |sub| | |
sub.apply e | |
end | |
end | |
end | |
end | |
class EventStoreLoggDecorator < DelegateClass(EventStore) | |
def append(id, *events) | |
super | |
logg "New events: #{events}" | |
end | |
end | |
class UnitOfWork < BaseObject | |
def initialize(event_store, id) | |
@id = id | |
@event_store = event_store | |
@expected_version = event_store.event_stream_version_for(id) | |
end | |
def create | |
@event_store.create @id | |
end | |
def append(*events) | |
@event_store.append @id, @expected_version, *events | |
end | |
end | |
class EventStoreRepository < BaseObject | |
module InstanceMethods | |
def find(id) | |
stream = registry.event_store.event_stream_for(id) | |
return if stream.nil? | |
build stream.to_a | |
end | |
def unit_of_work(id) | |
yield UnitOfWork.new(registry.event_store, id) | |
end | |
private | |
def build(stream) | |
obj = type.new stream.first.to_h | |
stream[1..-1].each do |event| | |
message = "apply_" + event.class.name.snake_case | |
send message.to_sym, obj, event | |
end | |
obj | |
end | |
end | |
include InstanceMethods | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# R E L E A S E | |
# | |
# a.k.a. Album | |
# | |
# Shows an example of using CrudAggregate. All stuff rolled into one | |
# class. Useful for the simplest aggregates that only needs CRUD | |
# operations. | |
# | |
RELEASE_ATTRIBUTES = %I(id title tracks) | |
class Release < Entity | |
attributes *RELEASE_ATTRIBUTES | |
include CrudAggregate | |
def assert_validity | |
# Do something here | |
end | |
end | |
class ReleaseCommand < Command | |
private | |
def validate | |
required(*RELEASE_ATTRIBUTES.map {|m| send m}) | |
non_blank_string(title) | |
end | |
end | |
class CreateRelease < ReleaseCommand | |
attributes *RELEASE_ATTRIBUTES | |
end | |
class ReleaseCreated < Event | |
attributes *RELEASE_ATTRIBUTES | |
end | |
class UpdateRelease < ReleaseCommand | |
attributes *RELEASE_ATTRIBUTES | |
end | |
class ReleaseUpdated < Event | |
attributes *RELEASE_ATTRIBUTES | |
end | |
# | |
# R E C O R D I N G | |
# | |
# Shows an example where all the different responsibilities are | |
# handled by separate objects. | |
# | |
class RecordingRepository < EventStoreRepository | |
def type | |
Recording | |
end | |
def apply_recording_updated(recording, event) | |
recording.set_attributes(event.to_h) | |
end | |
end | |
class RecordingValidator < BaseObject | |
def initialize(obj) | |
end | |
def assert_validity | |
# Do something here | |
end | |
end | |
class RecordingCommandHandler < CrudCommandHandler | |
private | |
def type; Recording; end | |
def repository | |
@repository ||= registry.repository_for(Recording) | |
end | |
def validator(obj) | |
RecordingValidator.new(obj) | |
end | |
def process_create_recording(command) | |
process_create(command) | |
end | |
def process_update_recording(command) | |
process_update(command) | |
end | |
end | |
RECORDING_ATTRIBUTES = %I(id title artist duration) | |
class RecordingCommand < Command | |
private | |
def validate | |
required(*RECORDING_ATTRIBUTES.map {|m| send m}) | |
non_blank_string(title) | |
non_blank_string(artist) | |
positive_integer(duration) | |
end | |
end | |
class CreateRecording < RecordingCommand | |
attributes *RECORDING_ATTRIBUTES | |
end | |
class RecordingCreated < Event | |
attributes *RECORDING_ATTRIBUTES | |
end | |
class UpdateRecording < RecordingCommand | |
attributes *RECORDING_ATTRIBUTES | |
end | |
class RecordingUpdated < Event | |
attributes *RECORDING_ATTRIBUTES | |
end | |
class Recording < Entity | |
attributes *RECORDING_ATTRIBUTES | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RepositoryProjection < BaseObject | |
def initialize | |
@repository = registry.repository_for type | |
end | |
def find(id) | |
@repository.find(id).to_h | |
end | |
def apply(*_args); end | |
private | |
def type | |
raise "Implement in subclass! #{self.class.name}" | |
end | |
end | |
class RecordingProjection < RepositoryProjection | |
def type | |
Recording | |
end | |
end | |
class SubscriberProjection < BaseObject | |
def initialize | |
@store = {} | |
registry.event_store.add_subscriber(self) | |
end | |
def find(id) | |
@store[id]&.clone | |
end | |
def apply(event) | |
handler_name = "when_#{event.class.name.snake_case}".to_sym | |
send handler_name, event if respond_to?(handler_name) | |
end | |
end | |
class ReleaseProjection < SubscriberProjection | |
def initialize(recordings) | |
super() | |
@recordings = recordings | |
end | |
def when_release_created(event) | |
release = build_release_from_event_data event | |
@store[event.id] = release | |
end | |
def when_release_updated(event) | |
release = build_release_from_event_data event | |
@store[event.id].merge! release | |
end | |
def when_recording_updated(_event) | |
refresh_all_tracks | |
end | |
private | |
def build_release_from_event_data(event) | |
release = event.to_h | |
release[:tracks] = track_id_to_data release.fetch(:tracks) | |
derive_artist_from_tracks(release) | |
release | |
end | |
def track_id_to_data(track_ids) | |
track_ids.map { |id| @recordings.find(id).to_h } | |
end | |
def refresh_all_tracks | |
@store.values.each do |r| | |
r.fetch(:tracks).map! {|track| track.fetch(:id)} | |
r[:tracks] = track_id_to_data r.fetch(:tracks) | |
end | |
end | |
def derive_artist_from_tracks(release) | |
artists = release[:tracks].map {|rec| rec[:artist]}.uniq | |
release[:artist] = artists.length == 1 ? artists.first : "Various artists" | |
end | |
end | |
class TotalsProjection < SubscriberProjection | |
def initialize | |
super | |
@totals = Hash.new(0) | |
end | |
def when_recording_created(event) | |
handle_create_event event | |
end | |
def when_release_created(event) | |
handle_create_event event | |
end | |
attr_reader :totals | |
private | |
def handle_create_event(event) | |
@totals[event.class] += 1 | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@sbellware: Thanks for sharing, looks interesting :-)