Skip to content

Instantly share code, notes, and snippets.

@krisleech
Created February 18, 2020 16:21
Show Gist options
  • Save krisleech/4f69feef213c90e68fda28d73cb1c948 to your computer and use it in GitHub Desktop.
Save krisleech/4f69feef213c90e68fda28d73cb1c948 to your computer and use it in GitHub Desktop.
Playing with Rails Event Store
ruby basic.rb

ruby aggregate_root.rb
require_relative 'env'
# Event
#
class BookPublished < RailsEventStore::Event; end
# Aggregate Root
#
class Book
include AggregateRoot
# not needed
# attr_reader :id, :title, :author
def initialize(id)
@id = id
@title = nil
@author = nil
end
# update state
#
def publish(attributes)
# by applying an event
#
raise if ready_published? # or publish an event
apply BookPublished.new(data: attributes)
end
# change state for applied event
#
on(BookPublished) do |event|
@isbn = event.data.fetch(:isbn)
@title = event.data.fetch(:title)
@author = event.data.fetch(:author)
end
end
# Read Model
#
module ReadModel
class Book < ActiveRecord::Base
def readonly?
!new_record?
end
after_find { readonly! }
end
end
# subscribe view model to event to build the read model
# Controller
#
repository = AggregateRoot::Repository.new
attributes = { isbn: '001', title: 'Daring Greatly', author: 'Brene Brown' } # typically params from the UI
stream_name = "books$#{attributes.fetch(:isbn)}"
# create a new book aggregate root
#
book = Book.new
# update state
#
book.publish(attributes)
puts book.unpublished_events.inspect
# store new state via individual events in event store
#
repository.store(book, stream_name)
# Controller
#
# Later we want to update the aggregate root...
repository = AggregateRoot::Repository.new
attributes = { isbn: '001', title: 'dare to lead', author: 'Brene Brown' } # typically params from the UI
stream_name = "books$#{attributes.fetch(:isbn)}"
# create the state by loading and applying events from stream
#
book = repository.load(Book.new, stream_name)
# update state
#
book.publish(attributes)
# store new events in event store
#
repository.store(book, stream_name)
# note: that we can't ever update isbn because that is used to identify the
# stream.
# Stream
#
puts
puts "#### EVENTS ####"
puts "Number of events: #{EventStore.read.stream('books').of_type(BookPublished).count}"
EventStore.read.of_type(BookPublished).each do |event|
puts "Event: #{event.event_id} #{event.data}"
end
# VIEW
#
# later we want to view the aggregate root by replaying the events from the its
# stream into a blank copy of the aggregate root
#
puts
puts "#### BOOK ###"
id = '001' # params from the UI
stream_name = "books$#{id}"
book = repository.load(Book.new, stream_name)
puts book.inspect
# The problem with this is that is it slow and we can't get all or a subset of
# book easily or fast.
# how to load aggregate up to a certain time, i.e. see the state 1 month ago?
# the event is not granular enough... each event contains all attributes, not
# just those we want to change.
# how do we create the read models, we need to listen to the events, even tho
# they span different streams.
#
require_relative 'env'
# Read Model
#
class Book < ActiveRecord::Base
def readonly?
!new_record?
end
after_find { readonly! }
end
# Event
#
class BookPublished < RailsEventStore::Event; end
# Subscriber (to create read model)
#
class CreateBookModel
def call(event)
Book.create!(event.data)
end
end
EventStore.subscribe(CreateBookModel.new, to: [BookPublished])
# Controller
#
event = BookPublished.new(data: { isbn: '001', title: 'Daring Greatly', author: 'Brene Brown' })
EventStore.publish(event, stream_name: 'books')
# Stream
#
puts
puts "#### EVENTS ####"
puts "Number of events: #{EventStore.read.stream('books').of_type(BookPublished).count}"
EventStore.read.stream('books').of_type(BookPublished).each do |event|
puts "Event: #{event.event_id} #{event.data}"
end
# View
#
puts
puts "#### BOOKS ####"
Book.all.each do |book|
puts book.inspect
end
require "bundler/inline"
install_gems = begin
require 'aggregate_root'
false
rescue LoadError
true
end
gemfile(install_gems, quiet: true) do
source "https://rubygems.org"
gem "rails"
gem "pry-byebug"
gem "sqlite3"
gem 'rails_event_store'
gem 'aggregate_root'
end
require "active_record"
require "logger"
ActiveRecord::Base.establish_connection(adapter: "sqlite3", database: ":memory:")
# ActiveRecord::Base.logger = Logger.new(STDOUT)
ActiveRecord::Base.logger = nil
ActiveRecord::Schema.define do
eval(File.read('rails_event_store_migration.rb'))
create_table :books, force: true, id: false do |t|
t.string :isbn, primary_key: true
t.string :title
t.string :author
end
end
EventStore = RailsEventStore::Client.new
AggregateRoot.configure do |config|
config.default_event_store = EventStore
end
postgres = ActiveRecord::Base.connection.adapter_name == "PostgreSQL"
sqlite = ActiveRecord::Base.connection.adapter_name == "SQLite"
rails_42 = Gem::Version.new(ActiveRecord::VERSION::STRING) < Gem::Version.new("5.0.0")
enable_extension "pgcrypto" if postgres
create_table(:event_store_events_in_streams, force: false) do |t|
t.string :stream, null: false
t.integer :position, null: true
if postgres
t.references :event, null: false, type: :uuid
else
t.references :event, null: false, type: :string, limit: 36
end
t.datetime :created_at, null: false
end
add_index :event_store_events_in_streams, [:stream, :position], unique: true
add_index :event_store_events_in_streams, [:created_at]
add_index :event_store_events_in_streams, [:stream, :event_id], unique: true
if postgres
create_table(:event_store_events, id: :uuid, default: 'gen_random_uuid()', force: false) do |t|
t.string :event_type, null: false
t.binary :metadata
t.binary :data, null: false
t.datetime :created_at, null: false
end
else
create_table(:event_store_events, id: false, force: false) do |t|
t.string :id, limit: 36, primary_key: true, null: false
t.string :event_type, null: false
t.binary :metadata
t.binary :data, null: false
t.datetime :created_at, null: false
end
if sqlite && rails_42
add_index :event_store_events, :id, unique: true
end
end
add_index :event_store_events, :created_at
add_index :event_store_events, :event_type
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment