Skip to content

Instantly share code, notes, and snippets.

@kjellm
Last active July 26, 2021 04:46
Show Gist options
  • Save kjellm/ec8fbaac65a28d67f17d941cc454f0f1 to your computer and use it in GitHub Desktop.
Save kjellm/ec8fbaac65a28d67f17d941cc454f0f1 to your computer and use it in GitHub Desktop.
Event Source proof of concept. Copyright 2017 Kjell-Magne Øierud. License: MIT https://opensource.org/licenses/MIT
require_relative 'base'
require_relative 'event'
require_relative 'cmd'
require_relative 'crud'
require_relative 'model'
require_relative 'read'
require 'pp'
class Application < BaseObject
def initialize
@recording_id = UUID.generate
@release_id = UUID.generate
initialize_projections
end
def main
puts "LOGG ---------------------------------------------------------"
run_commands
puts
puts "EVENT STORE ------------------------------------------------"
pp registry.event_store
puts
puts "PROJECTIONS ------------------------------------------------"
peek_at_projections
end
private
def initialize_projections
@the_recording_projection = RecordingProjection.new
@the_release_projection = ReleaseProjection.new(@the_recording_projection)
@the_totals_projection = TotalsProjection.new
@projections = [
@the_release_projection,
@the_recording_projection,
@the_totals_projection,
]
end
def peek_at_projections
p @the_release_projection.find @release_id
p @the_recording_projection.find @recording_id
p @the_totals_projection.totals
end
def run_commands
recording_data = {id: @recording_id, title: "Sledge Hammer",
artist: "Peter Gabriel", duration: 313}
run(recording_data, CreateRecording, Recording)
run({id: @release_id, title: "So", tracks: []},
CreateRelease, Release)
run({id: UUID.generate, title: "Shaking The Tree",
tracks: [@recording_id]},
CreateRelease, Release)
run({id: @release_id, title: "So", tracks: [@recording_id]},
UpdateRelease, Release)
run(recording_data.merge({ title: "Sledgehammer" }),
UpdateRecording, Recording)
# Some failing commands, look in log for verification of failure
run({id: "Non-existing ID", title: "Foobar"},
UpdateRecording, Recording)
end
def run(request_data, command_class, aggregate)
logg "Incoming request with data: #{request_data.inspect}"
command_handler = registry.command_handler_for(aggregate)
command = command_class.new(request_data)
command_handler.handle command
rescue StandardError => e
logg "ERROR: Command #{command} failed because of: #{e}"
end
end
Application.new.main
require 'set'
require 'date'
require 'securerandom'
class String
def snake_case
split(/(?=[A-Z]+)/).map(&:downcase).join("_")
end
end
class Hash
def slice(*keys)
keys.each_with_object(self.class.new) do
|k, hash| hash[k] = self[k] if has_key?(k)
end
end
end
module UUID
def self.generate
SecureRandom.uuid
end
def self.as_int(uuid)
Integer(uuid.split("-").join, 16)
end
def self.from_int(int)
int.to_s(16).rjust(32, '0').split(/(\h{8})(\h{4})(\h{4})(\h{4})(\h{12})/)[1..-1].join("-")
end
end
module Attributes
def attributes(*names)
attr_reader(*names)
define_singleton_method(:attribute_names) { names }
mod = Module.new do
define_method :initialize do |**attrs|
attrs.each do |name, value|
raise ArgumentError.new "Unrecognized argument: #{name}" unless names.include? name
if respond_to? "#{name}=", true
send "#{name}=", value
else
instance_variable_set "@#{name}", value
end
end
super(**attrs)
end
end
include mod
names
end
end
class BaseObject
extend Attributes
def initialize(*_args)
end
module ClassAndInstanceMethods
def logg(*args)
print "#{DateTime.now} - ", *args
puts
end
def registry
@@registry ||= Registry.new
end
end
include ClassAndInstanceMethods
extend ClassAndInstanceMethods
def to_h
Hash[self.class.attribute_names.map {|name| [name, send(name)] }]
end
end
class Registry < BaseObject
def command_handler_for(klass)
handler = if klass.respond_to? :handle
klass
else
self.class.const_get("#{klass}CommandHandler").new
end
CommandHandlerLoggDecorator.new(handler)
end
def event_store
@event_store ||=
EventStoreOptimisticLockDecorator.new(
EventStoreLoggDecorator.new(
EventStorePubSubDecorator.new(
EventStore.new)))
end
def repository_for(klass)
if klass < CrudAggregate
klass
else
self.class.const_get("#{klass}Repository").new
end
end
end
class Entity < BaseObject
def set_attributes(attrs)
(self.class.attribute_names - [:id]).each do |name|
instance_variable_set(:"@#{name}", attrs[name]) if attrs.key?(name)
end
end
end
class ValueObject < BaseObject
end
class EventStoreError < StandardError
end
class EventStoreConcurrencyError < EventStoreError
end
class Event < ValueObject
end
module Validations
def required(*values)
values.none?(&:nil?) or
raise ArgumentError
end
def non_blank_string(obj)
return unless obj
obj.is_a?(String) && !obj.strip.empty? or
raise ArgumentError
end
def positive_integer(obj)
return unless obj
obj.is_a?(Integer) && obj > 0 or
raise ArgumentError
end
end
class Command < ValueObject
include Validations
def initialize(*args)
super
validate
end
def validate
raise "Implement in subclass! #{self.class.name}"
end
end
class CommandHandler < BaseObject
module InstanceMethods
def handle(command)
process(command)
return
end
def process(command)
message = "process_" + command.class.name.snake_case
send message.to_sym, command
end
end
include InstanceMethods
end
class CommandHandlerLoggDecorator < DelegateClass(CommandHandler)
def initialize(obj)
super obj
end
def handle(command)
logg "Start handling: #{command.inspect}"
super
ensure
logg "Done handling: #{command.class.name}"
end
end
class CrudCommandHandler < CommandHandler
module InstanceMethods
private
def validator(obj)
raise "Implement in subclass!"
end
def repository
raise "Implement in subclass!"
end
def type
raise "Implement in subclass!"
end
def process_create(command)
repository.unit_of_work(command.id) do |uow|
obj = type.new(command.to_h)
validator(obj).assert_validity
event = self.class.const_get("#{type}Created").new(command.to_h)
uow.create
uow.append event
end
end
def process_update(command)
repository.unit_of_work(command.id) do |uow|
obj = repository.find command.id
raise ArgumentError if obj.nil?
obj.set_attributes command.to_h
validator(obj).assert_validity
event = self.class.const_get("#{type}Updated").new(command.to_h)
uow.append event
end
end
end
include InstanceMethods
end
module CrudAggregate
module ClassMethods
def repository
self
end
def validator(obj)
obj
end
end
def assert_validity
end
def self.included(othermod)
othermod.extend CommandHandler::InstanceMethods
othermod.extend CrudCommandHandler::InstanceMethods
othermod.extend EventStoreRepository::InstanceMethods
othermod.extend ClassMethods
othermod_name = othermod.name.snake_case
othermod.define_singleton_method("type") { othermod }
othermod.define_singleton_method "process_create_#{othermod_name}" do |command|
process_create command
end
othermod.define_singleton_method "process_update_#{othermod_name}" do |command|
process_update command
end
othermod.define_singleton_method("apply_#{othermod_name}_updated") do |obj, event|
obj.set_attributes(event.to_h)
end
end
end
class EventStream < BaseObject
def initialize
@event_sequence = []
end
def version
@event_sequence.length
end
def append(*events)
@event_sequence.push(*events)
end
def to_a
@event_sequence.clone
end
end
class EventStore < BaseObject
def initialize
@streams = {}
end
def create(id)
raise EventStoreError, "Stream exists for #{id}" if @streams.key? id
@streams[id] = EventStream.new
end
def append(id, *events)
@streams.fetch(id).append(*events)
end
def event_stream_for(id)
@streams[id]&.clone
end
def event_stream_version_for(id)
@streams[id]&.version || 0
end
end
class EventStoreOptimisticLockDecorator < DelegateClass(EventStore)
def initialize(obj)
super
@locks = {}
end
def create(id)
@locks[id] = Mutex.new
super
end
def append(id, expected_version, *events)
@locks[id].synchronize do
event_stream_version_for(id) == expected_version or
raise EventStoreConcurrencyError
super id, *events
end
end
end
class EventStorePubSubDecorator < DelegateClass(EventStore)
def initialize(obj)
super
@subscribers = []
end
def add_subscriber(subscriber)
@subscribers << subscriber
end
def append(id, *events)
super
publish(*events)
end
def publish(*events)
events.each do |e|
@subscribers.each do |sub|
sub.apply e
end
end
end
end
class EventStoreLoggDecorator < DelegateClass(EventStore)
def append(id, *events)
super
logg "New events: #{events}"
end
end
class UnitOfWork < BaseObject
def initialize(event_store, id)
@id = id
@event_store = event_store
@expected_version = event_store.event_stream_version_for(id)
end
def create
@event_store.create @id
end
def append(*events)
@event_store.append @id, @expected_version, *events
end
end
class EventStoreRepository < BaseObject
module InstanceMethods
def find(id)
stream = registry.event_store.event_stream_for(id)
return if stream.nil?
build stream.to_a
end
def unit_of_work(id)
yield UnitOfWork.new(registry.event_store, id)
end
private
def build(stream)
obj = type.new stream.first.to_h
stream[1..-1].each do |event|
message = "apply_" + event.class.name.snake_case
send message.to_sym, obj, event
end
obj
end
end
include InstanceMethods
end
#
# R E L E A S E
#
# a.k.a. Album
#
# Shows an example of using CrudAggregate. All stuff rolled into one
# class. Useful for the simplest aggregates that only needs CRUD
# operations.
#
RELEASE_ATTRIBUTES = %I(id title tracks)
class Release < Entity
attributes *RELEASE_ATTRIBUTES
include CrudAggregate
def assert_validity
# Do something here
end
end
class ReleaseCommand < Command
private
def validate
required(*RELEASE_ATTRIBUTES.map {|m| send m})
non_blank_string(title)
end
end
class CreateRelease < ReleaseCommand
attributes *RELEASE_ATTRIBUTES
end
class ReleaseCreated < Event
attributes *RELEASE_ATTRIBUTES
end
class UpdateRelease < ReleaseCommand
attributes *RELEASE_ATTRIBUTES
end
class ReleaseUpdated < Event
attributes *RELEASE_ATTRIBUTES
end
#
# R E C O R D I N G
#
# Shows an example where all the different responsibilities are
# handled by separate objects.
#
class RecordingRepository < EventStoreRepository
def type
Recording
end
def apply_recording_updated(recording, event)
recording.set_attributes(event.to_h)
end
end
class RecordingValidator < BaseObject
def initialize(obj)
end
def assert_validity
# Do something here
end
end
class RecordingCommandHandler < CrudCommandHandler
private
def type; Recording; end
def repository
@repository ||= registry.repository_for(Recording)
end
def validator(obj)
RecordingValidator.new(obj)
end
def process_create_recording(command)
process_create(command)
end
def process_update_recording(command)
process_update(command)
end
end
RECORDING_ATTRIBUTES = %I(id title artist duration)
class RecordingCommand < Command
private
def validate
required(*RECORDING_ATTRIBUTES.map {|m| send m})
non_blank_string(title)
non_blank_string(artist)
positive_integer(duration)
end
end
class CreateRecording < RecordingCommand
attributes *RECORDING_ATTRIBUTES
end
class RecordingCreated < Event
attributes *RECORDING_ATTRIBUTES
end
class UpdateRecording < RecordingCommand
attributes *RECORDING_ATTRIBUTES
end
class RecordingUpdated < Event
attributes *RECORDING_ATTRIBUTES
end
class Recording < Entity
attributes *RECORDING_ATTRIBUTES
end
class RepositoryProjection < BaseObject
def initialize
@repository = registry.repository_for type
end
def find(id)
@repository.find(id).to_h
end
def apply(*_args); end
private
def type
raise "Implement in subclass! #{self.class.name}"
end
end
class RecordingProjection < RepositoryProjection
def type
Recording
end
end
class SubscriberProjection < BaseObject
def initialize
@store = {}
registry.event_store.add_subscriber(self)
end
def find(id)
@store[id]&.clone
end
def apply(event)
handler_name = "when_#{event.class.name.snake_case}".to_sym
send handler_name, event if respond_to?(handler_name)
end
end
class ReleaseProjection < SubscriberProjection
def initialize(recordings)
super()
@recordings = recordings
end
def when_release_created(event)
release = build_release_from_event_data event
@store[event.id] = release
end
def when_release_updated(event)
release = build_release_from_event_data event
@store[event.id].merge! release
end
def when_recording_updated(_event)
refresh_all_tracks
end
private
def build_release_from_event_data(event)
release = event.to_h
release[:tracks] = track_id_to_data release.fetch(:tracks)
derive_artist_from_tracks(release)
release
end
def track_id_to_data(track_ids)
track_ids.map { |id| @recordings.find(id).to_h }
end
def refresh_all_tracks
@store.values.each do |r|
r.fetch(:tracks).map! {|track| track.fetch(:id)}
r[:tracks] = track_id_to_data r.fetch(:tracks)
end
end
def derive_artist_from_tracks(release)
artists = release[:tracks].map {|rec| rec[:artist]}.uniq
release[:artist] = artists.length == 1 ? artists.first : "Various artists"
end
end
class TotalsProjection < SubscriberProjection
def initialize
super
@totals = Hash.new(0)
end
def when_recording_created(event)
handle_create_event event
end
def when_release_created(event)
handle_create_event event
end
attr_reader :totals
private
def handle_create_event(event)
@totals[event.class] += 1
end
end
@sbellware
Copy link

Shameless plug... If you like this stuff, we have a complete implementation of Event Sourcing libraries in Ruby with support for EventStore and Postgres: http://eventide-project.org/quick_look.html

@kjellm
Copy link
Author

kjellm commented Feb 13, 2017

@sbellware: Thanks for sharing, looks interesting :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment