-
-
Save scalaview/1431ae70dc1a237e5826de80c80b5f9e to your computer and use it in GitHub Desktop.
Sagas implementation in pure Ruby (pre-DirtyPipelines gem)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DirtyPipeline::Action | |
module WrappedCall | |
def call | |
Events.publish! Event.generate(self) | |
super | |
end | |
end | |
class << self | |
attr_accessor :attempted_event_klass, :pipeline, :timeout | |
def inherited(child_klass) | |
child_klass.prepend(WrappedCall) | |
child_klass.attempted_event_klass ||= self.attempted_event_klass | |
child_klass.pipeline ||= self.pipeline | |
child_klass.timeout ||= self.timeout | |
end | |
def from_event(event) | |
event = Events.unpack(event) | |
event_as_kwargs = event.symbolize_keys | |
event_as_kwargs[:subject] = find_subject(**event_as_kwargs) | |
build_action(**event_as_kwargs) | |
end | |
def find_subject(subject_name:, subject_id:, **) | |
Kernel.const_get(subject_name).find(subject_id) | |
end | |
def build_action(action_name:, context:, subject: nil, **) | |
Kernel.const_get(action_name).new(subject, context: context) | |
end | |
def call(subject = nil, context: nil, **opts) | |
new(subject, context: context, **opts).tap(&:call) | |
end | |
end | |
self.timeout = 5 | |
attr_reader :subject, :status, :context | |
def initialize(subject = nil, context: nil, **opts) | |
@context = (context || Hash.new).merge(opts) | |
@status = Success.new | |
@subject = subject || self.class.find_subject(**context.symbolize_keys) | |
end | |
def call; end | |
def undo; end | |
# verify external state and decide weather we need compensation or not | |
def applied? | |
true | |
end | |
end | |
class DirtyPipeline::Clean | |
def initialize(done_event) | |
@event = done_event | |
@strategy = done_event.call_success ? :validate : :undo | |
end | |
def call | |
status = send(@strategy).tap |status| | |
next if status.success? | |
Lock.fail!(status.payload[:subject], status.errors) | |
end | |
ensure | |
Lock.new(@event.pipeline, @event.subject).release if status.success? | |
end | |
private | |
def undo | |
pipeline_events.reverse_each do |event| | |
action = Action.from_event(event).tap(&:undo) | |
return undo_failure(action) if action.status.failure? | |
Events.complete!(event) | |
end | |
Success.new | |
end | |
def undo_failure(action) | |
Failure.new(errors: {self => :undo_failed}, subject: action.subject) | |
end | |
def validate | |
pipeline_events.each do |event| | |
action = Action.from_event(event) | |
return validation_failure(action) unless action.applied? | |
Events.complete!(event) | |
end | |
Success.new | |
end | |
def validation_failure(action) | |
Failure.new(errors: {self => :broken_state}, subject: action.subject) | |
end | |
def pipeline_events | |
Events.select_for(@pipeline_name, @subject) | |
end | |
end | |
class DirtyPipeline::Events | |
# unpack Event's payload to a hash | |
def unpack(event) | |
end | |
# mark Event as processed | |
def complete!(event) | |
end | |
# read Events for a pipeline with subject | |
def select_for(pipeline, subject) | |
end | |
# built an Event | |
def generate(action_or_pipeline) | |
end | |
# publish freshly built Event to the Events stream | |
def publish!(event) | |
end | |
end | |
class DirtyPipeline::Lock | |
def self.fail!(subject, errors); end | |
def initialize(pipeline, subject) | |
@pipeline = pipeline | |
@subject = subject | |
end | |
def when_locked(timeout) | |
return manual_failure if failed? | |
return locked_failure if locked? | |
lock(timeout, &block) | |
end | |
def manual_failure | |
Failure.new(errors: {self => :needs_manual_recovery}) | |
end | |
def locked_failure | |
Failure.new(errors: {self => :pipeline_locked}) | |
end | |
def failed?; end | |
def locked?; end | |
def release; end | |
def lock(timeout, &block); end | |
end | |
module Pipelines | |
module Status | |
class Base | |
attr_reader :payload | |
alias :data :payload | |
def initialize(payload = nil) | |
@payload = payload || Hash.new { |h,k| h[k] = Hash.new } | |
end | |
def merge(other_status) | |
raise ArgumentError unless Base === other_status | |
klass = Failure if other_status.failure? || failure? | |
klass ||= Success | |
klass.new(payload.merge(other_status.payload)) | |
end | |
def failure? | |
!success? | |
end | |
def success? | |
errors.empty? | |
end | |
end | |
end | |
class Failure < Status::Base | |
def errors | |
payload[:errors].to_h | |
end | |
end | |
class Success < Status::Base | |
def errors | |
{} | |
end | |
end | |
end | |
class DirtyPipeline::Base | |
class << self | |
def root | |
@root || self | |
end | |
def inherited(new_klass) | |
new_klass.instance_variable_set(:@steps, []) | |
new_klass.instance_variable_set(:@root, self.root) | |
end | |
def chain(klass) | |
c = Class.new(Base) | |
c.instance_variable_set(:@steps, self.steps.to_a + [klass]) | |
c | |
end | |
end | |
attr_reader :subject, :context | |
def initialize(subject, context: nil, **opts) | |
@subject = subject | |
@status = Success.new | |
@context = (context || Hash.new).merge(opts) | |
end | |
def call(**context) | |
timeout = self.class.steps.map(&:timeout).sum | |
Lock.new(self.class.root, @subject).when_locked(timeout) do | |
shared_context = @context.merge(context.stringify_keys) | |
self.class.steps.each do |action| | |
@status = action.call(@subject, context: shared_context).status | |
break if @status.failure? | |
end | |
done = Events.publish!(Events.generate(self)) | |
clean = Clean.new(done).tap(&:call) | |
@status.merge(clean.status) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment