Skip to content

Instantly share code, notes, and snippets.

@scalaview
Forked from sclinede/pipelines.rb
Created February 27, 2021 15:37
Show Gist options
  • Save scalaview/1431ae70dc1a237e5826de80c80b5f9e to your computer and use it in GitHub Desktop.
Save scalaview/1431ae70dc1a237e5826de80c80b5f9e to your computer and use it in GitHub Desktop.
Sagas implementation in pure Ruby (pre-DirtyPipelines gem)
class DirtyPipeline::Action
module WrappedCall
def call
Events.publish! Event.generate(self)
super
end
end
class << self
attr_accessor :attempted_event_klass, :pipeline, :timeout
def inherited(child_klass)
child_klass.prepend(WrappedCall)
child_klass.attempted_event_klass ||= self.attempted_event_klass
child_klass.pipeline ||= self.pipeline
child_klass.timeout ||= self.timeout
end
def from_event(event)
event = Events.unpack(event)
event_as_kwargs = event.symbolize_keys
event_as_kwargs[:subject] = find_subject(**event_as_kwargs)
build_action(**event_as_kwargs)
end
def find_subject(subject_name:, subject_id:, **)
Kernel.const_get(subject_name).find(subject_id)
end
def build_action(action_name:, context:, subject: nil, **)
Kernel.const_get(action_name).new(subject, context: context)
end
def call(subject = nil, context: nil, **opts)
new(subject, context: context, **opts).tap(&:call)
end
end
self.timeout = 5
attr_reader :subject, :status, :context
def initialize(subject = nil, context: nil, **opts)
@context = (context || Hash.new).merge(opts)
@status = Success.new
@subject = subject || self.class.find_subject(**context.symbolize_keys)
end
def call; end
def undo; end
# verify external state and decide weather we need compensation or not
def applied?
true
end
end
class DirtyPipeline::Clean
def initialize(done_event)
@event = done_event
@strategy = done_event.call_success ? :validate : :undo
end
def call
status = send(@strategy).tap |status|
next if status.success?
Lock.fail!(status.payload[:subject], status.errors)
end
ensure
Lock.new(@event.pipeline, @event.subject).release if status.success?
end
private
def undo
pipeline_events.reverse_each do |event|
action = Action.from_event(event).tap(&:undo)
return undo_failure(action) if action.status.failure?
Events.complete!(event)
end
Success.new
end
def undo_failure(action)
Failure.new(errors: {self => :undo_failed}, subject: action.subject)
end
def validate
pipeline_events.each do |event|
action = Action.from_event(event)
return validation_failure(action) unless action.applied?
Events.complete!(event)
end
Success.new
end
def validation_failure(action)
Failure.new(errors: {self => :broken_state}, subject: action.subject)
end
def pipeline_events
Events.select_for(@pipeline_name, @subject)
end
end
class DirtyPipeline::Events
# unpack Event's payload to a hash
def unpack(event)
end
# mark Event as processed
def complete!(event)
end
# read Events for a pipeline with subject
def select_for(pipeline, subject)
end
# built an Event
def generate(action_or_pipeline)
end
# publish freshly built Event to the Events stream
def publish!(event)
end
end
class DirtyPipeline::Lock
def self.fail!(subject, errors); end
def initialize(pipeline, subject)
@pipeline = pipeline
@subject = subject
end
def when_locked(timeout)
return manual_failure if failed?
return locked_failure if locked?
lock(timeout, &block)
end
def manual_failure
Failure.new(errors: {self => :needs_manual_recovery})
end
def locked_failure
Failure.new(errors: {self => :pipeline_locked})
end
def failed?; end
def locked?; end
def release; end
def lock(timeout, &block); end
end
module Pipelines
module Status
class Base
attr_reader :payload
alias :data :payload
def initialize(payload = nil)
@payload = payload || Hash.new { |h,k| h[k] = Hash.new }
end
def merge(other_status)
raise ArgumentError unless Base === other_status
klass = Failure if other_status.failure? || failure?
klass ||= Success
klass.new(payload.merge(other_status.payload))
end
def failure?
!success?
end
def success?
errors.empty?
end
end
end
class Failure < Status::Base
def errors
payload[:errors].to_h
end
end
class Success < Status::Base
def errors
{}
end
end
end
class DirtyPipeline::Base
class << self
def root
@root || self
end
def inherited(new_klass)
new_klass.instance_variable_set(:@steps, [])
new_klass.instance_variable_set(:@root, self.root)
end
def chain(klass)
c = Class.new(Base)
c.instance_variable_set(:@steps, self.steps.to_a + [klass])
c
end
end
attr_reader :subject, :context
def initialize(subject, context: nil, **opts)
@subject = subject
@status = Success.new
@context = (context || Hash.new).merge(opts)
end
def call(**context)
timeout = self.class.steps.map(&:timeout).sum
Lock.new(self.class.root, @subject).when_locked(timeout) do
shared_context = @context.merge(context.stringify_keys)
self.class.steps.each do |action|
@status = action.call(@subject, context: shared_context).status
break if @status.failure?
end
done = Events.publish!(Events.generate(self))
clean = Clean.new(done).tap(&:call)
@status.merge(clean.status)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment