Created
March 25, 2024 18:26
-
-
Save backpackerhh/c63bcb5e0b006ccbb912435553fd82c7 to your computer and use it in GitHub Desktop.
Event bus implemented in Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
YourApp.register_provider :domain_events, namespace: true do # dry-system | |
prepare do | |
Dir[target.root.join("path/to/**/*_event_subscriber.rb")].each { |file| require file } | |
register "subscribers", EventSubscriber.subclasses | |
end | |
start do | |
register "bus", InMemoryEventBus.new | |
register "async_bus", SidekiqEventBus.new | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Event | |
attr_reader :aggregate_id, :aggregate_attributes, :occurred_at, :id, :name | |
private_class_method :new | |
def self.from_primitives(attributes) | |
new(id: attributes.fetch(:id, SecureRandom.uuid), | |
aggregate_id: attributes.fetch(:aggregate_id), | |
aggregate_attributes: attributes.fetch(:aggregate_attributes), | |
occurred_at: attributes.fetch(:occurred_at)) | |
end | |
def initialize(id:, aggregate_id:, aggregate_attributes:, occurred_at:) | |
@id = id | |
@aggregate_id = aggregate_id | |
@aggregate_attributes = aggregate_attributes | |
@occurred_at = occurred_at | |
@name = self.class.name | |
end | |
def to_primitives | |
{ | |
id:, | |
type: name, | |
occurred_at: occurred_at.strftime("%Y-%m-%d %H:%M:%S.%N %z"), | |
attributes: { | |
id: aggregate_id, | |
**aggregate_attributes | |
} | |
} | |
end | |
def ==(other) | |
name == other.name && | |
occurred_at == other.occurred_at && | |
aggregate_id == other.aggregate_id && | |
aggregate_attributes == other.aggregate_attributes | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EventBus | |
include Deps[event_subscribers: "domain_events.subscribers"] # dry-auto_inject | |
attr_reader :event_subscriptions | |
def initialize(...) | |
super(...) | |
@event_subscriptions = Hash.new { |hash, key| hash[key] = [] } | |
event_subscribers.each do |event_subscriber_klass| | |
event_subscriber = event_subscriber_klass.new | |
event_subscriber.subscribed_to.each do |event_klass| | |
@event_subscriptions[event_klass] << event_subscriber | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "json-schema" | |
class EventSerializer | |
def self.serialize(event) | |
uuid_regex_pattern = "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" | |
time_regex_pattern = /\A\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{9} \+\d{4}\z/ | |
schema = { | |
id: "domain-events-serializer", | |
type: "object", | |
required: %w[id type occurred_at attributes], | |
properties: { | |
id: { type: "string", pattern: uuid_regex_pattern }, | |
type: { type: "string" }, | |
occurred_at: { type: "string", pattern: time_regex_pattern }, | |
attributes: { | |
type: "object", | |
required: ["id"], | |
properties: { | |
id: { type: "string", pattern: uuid_regex_pattern } | |
} | |
} | |
} | |
} | |
validation_errors = JSON::Validator.fully_validate(schema, event.to_primitives) | |
if validation_errors.any? | |
raise Domain::InvalidEventSchemaError, validation_errors | |
end | |
JSON.parse({ data: { **event.to_primitives } }.to_json) | |
end | |
def self.deserialize(raw_event) | |
event_klass = Object.const_get(raw_event.dig("data", "type")) | |
event_klass.from_primitives( | |
aggregate_id: raw_event.dig("data", "attributes", "id"), | |
aggregate_attributes: raw_event.dig("data", "attributes").except("id").transform_keys(&:to_sym), | |
occurred_at: Time.parse(raw_event.dig("data", "occurred_at")), | |
id: raw_event.dig("data", "id") | |
) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class EventSubscriber | |
def on(event) | |
raise NotImplementedError, "Define what will the event subscriber do upon receiving an event in #on method" | |
end | |
def subscribed_to | |
raise NotImplementedError, "Define the list of events in #subscribed_to method" | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OrderCreatedEvent < Event | |
def self.from(order) | |
from_primitives( | |
aggregate_id: order.id.value, | |
aggregate_attributes: { | |
amount: order.amount.value, | |
# other attributes... | |
}, | |
occurred_at: order.created_at.value | |
) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CreateOrderCommissionOnOrderCreatedEventSubscriber < EventSubscriber | |
def on(event) | |
CreateOrderCommissionUseCase.new.create( | |
order_id: event.aggregate_id, | |
order_amount: event.aggregate_attributes.fetch(:amount) | |
) | |
end | |
def subscribed_to | |
[OrderCreatedEvent] | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CreateOrderUseCase | |
include Deps[event_bus: "domain_events.async_bus"] | |
def create(attributes) | |
# omitted... | |
event_bus.publish(OrderCreatedEvent.from(order)) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class InMemoryEventBus < EventBus | |
def publish(event) | |
event_subscriptions[event.class].each { |subscriber| subscriber.on(event) } | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "sidekiq" | |
class PublishEventJob | |
include Sidekiq::Job | |
sidekiq_options queue: "domain_events", unique: true, retry_for: 3600 # 1 hour | |
def perform(subscriber_klass_name, raw_event) | |
event = EventSerializer.deserialize(raw_event) | |
Object.const_get(subscriber_klass_name).new.on(event) | |
logger.info("Job enqueued to publish event #{event.id}") | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SidekiqEventBus < EventBus | |
def publish(event) | |
event_subscriptions[event.class].each do |subscriber| | |
PublishEventJob.perform_async(subscriber.class.name, EventSerializer.serialize(event)) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment