Last active
August 27, 2024 21:59
-
-
Save ismasan/0bdcc76c2ea48f4259b38fafe131edb8 to your computer and use it in GitHub Desktop.
Practical Railway-oriented Pipeline for Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A Pipeline extension to process steps concurrently | |
# Example | |
# class ConcurrentPipeline < Pipeline | |
# include ConcurrentProcessing | |
# end | |
# | |
# MyPipeline = ConcurrentPipeline.new do |pl| | |
# pl.step ValidateInput | |
# | |
# # These steps run concurrently | |
# pl.concurrent do |pl2| | |
# pl2.step BookTickets | |
# pl2.step BookHotels | |
# pl2.step BookCar | |
# end | |
# | |
# pl.step SendConfirmationEmail | |
# end | |
require 'concurrent' | |
module ConcurrentProcessing | |
class ConcurrentPipelineStep < SimpleDelegator | |
def initialize(pipeline) | |
super(pipeline) | |
end | |
# TODO: we need to replicate the handling of `context[:trace]` here. | |
# this could be handled elsewhere for all pipelines types. | |
# @param result [Result] | |
# @return [Result] | |
def call(result) | |
trace = result.context[:trace] || [] | |
results = steps.map.with_index(1) do |step, position| | |
Concurrent::Future.execute { step.call(result.deep_dup.with_context(:trace, trace + [position])) } | |
end.map(&:value) | |
continue, halt = results.partition(&:continue?) | |
halt.any? ? result.halt(halt.map(&:value)) : result.continue(continue.map(&:value)) | |
end | |
end | |
# Build a sub-pipeline that will run steps in parallel. | |
def concurrent(&block) | |
step ConcurrentPipelineStep.new(self.class.new(&block)) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require 'delegate' | |
# A generic pipeline to implement railway-oriented processing. | |
# Blog post: https://ismaelcelis.com/posts/practical-railway-oriented-pipelines-in-ruby/ | |
# @example | |
# | |
# pipeline = Pipeline.new do |pl| | |
# # with a Step interface [#call(Result) Result] | |
# # It can be any object that implements that interface | |
# # including Procs, lambdas, custom instances, classes or modules, or other [Switcher::Pipeline] instances. | |
# pl.step Step1 | |
# | |
# # with an inline Step block | |
# pl.step do |result| | |
# result.success(result.set.first(result.params[:limit])) | |
# end | |
# end | |
# | |
# initial_result = Pipeline::Result.continue(['a', 'b', 'c', ...etc], params: { limit: 5 }) | |
# result = pipeline.call(initial_result) | |
# result.continue? # => true | |
# | |
# [Result] is expected to respond to [#continue?() Boolean], [#params() Hash] and [#context() Hash]. | |
class Pipeline | |
class Result | |
class << self | |
def continue(value, params: {}, context: {}) | |
new(value, params:, context:) | |
end | |
end | |
attr_reader :value, :params, :context, :errors | |
ERRORS = Hash.new { |h, k| h[k] = [] } | |
def initialize(value, params: {}, context: {}, errors: ERRORS, continue: true) | |
@continue = continue | |
@value = value | |
@params = params | |
@context = context | |
@errors = errors | |
freeze | |
end | |
def inspect | |
%(<#{self.class.name}:#{object_id} [#{continue? ? 'CONTINUE' : 'HALT'}] value=#{value.inspect} params=#{params.inspect} context=#{context.inspect} errors=#{errors.inspect}>) | |
end | |
def continue? = @continue | |
def continue(a_value = value) | |
self.class.new(a_value, params:, context:, errors:, continue: true) | |
end | |
def halt(a_value = value) | |
self.class.new(a_value, params:, context:, errors:, continue: false) | |
end | |
def with_error(error_key, error_message) | |
errors = self.errors.dup | |
errors[error_key] << error_message | |
self.class.new(value, params:, context:, errors:, continue: continue?) | |
end | |
def with_context(key, default_value, &block) | |
ctx_value = if block_given? | |
block.call(self.context.fetch(key, default_value)) | |
else | |
default_value | |
end | |
self.class.new(value, params:, context: context.merge(key => ctx_value), errors:, continue: continue?) | |
end | |
end | |
class << self | |
# Class-level middleware stack | |
# a Middleware decorates each step in the pipeline, including other pipelines of the same class or subclass. | |
# @return [Array<#call(Step, Result) Result>] | |
def middleware_stack | |
@middleware_stack ||= [] | |
end | |
# @param stack [Array<#call(Step, Result) Result>] | |
def middleware_stack=(stack) | |
@middleware_stack = stack | |
end | |
# Adds a middleware to the class-level middleware stack. | |
# Example: | |
# | |
# class MyPipeline < Pipeline | |
# # With a block: | |
# middleware do |step, result| | |
# p result.context[:trace] | |
# step.call(result) | |
# end | |
# | |
# # With a callable object: | |
# middleware Logging.new(Rails.logger) | |
# end | |
# | |
# @param middleware [#call(Step, Result) Result, nil] | |
# @yieldparam step [Step] | |
# @yieldparam result [Result] | |
# @return [self] | |
def middleware(middleware = nil, &block) | |
middleware ||= block | |
raise ArgumentError, "Middleware expects a block or a callable object" unless middleware | |
self.middleware_stack << middleware | |
self | |
end | |
def inherited(subclass) | |
subclass.middleware_stack = middleware_stack.dup | |
super | |
end | |
end | |
# Helper to build a pipeline composed of steps | |
# | |
# pipe = Pipeline.compose([Step1.new, Step2.new]) | |
# | |
# @param [Array<Step>] steps | |
# @return [Pipeline] | |
def self.compose(steps) | |
new do |pl| | |
steps.each { |st| pl.step(st) } | |
end | |
end | |
attr_reader :steps | |
def initialize(&setup) | |
@steps = [] | |
setup.call(self) and freeze if block_given? | |
end | |
def freeze | |
@steps.freeze | |
super | |
end | |
def ==(other) | |
other.is_a?(self.class) && other.steps == steps | |
end | |
# Appends a [Step] to the internal pipeline. | |
# Example: | |
# | |
# Pipeline.new do |pl| | |
# # with a Step interface | |
# pl.step Step1 | |
# | |
# # with an inline Step block | |
# pl.step do |result| | |
# result.success(result.set.first(10)) | |
# end | |
# end | |
# | |
# @param callable [Step]. Optional. | |
# @param &block [Proc]. Optional. | |
# @return [self] | |
def step(callable = nil, &block) | |
callable ||= block | |
raise ArgumentError, "#step expects an interface #call(Result) Result, but got #{callable.inspect}" unless callable.respond_to?(:call) | |
callable = self.class.middleware_stack.reduce(callable) { |step, mid| MiddlewareStep.new(step, mid) } | |
callable = StepTracker.new(callable) | |
@steps << callable | |
self | |
end | |
# A step to decorate middleware as a [Step] interface. | |
class MiddlewareStep < SimpleDelegator | |
# @param step [Step] | |
# @param mid [#call(Step, Result) Result] | |
def initialize(step, mid) | |
super(step) | |
@mid = mid | |
end | |
# @param result [Result] | |
# @return [Result] | |
def call(result) | |
@mid.call(__getobj__, result) | |
end | |
end | |
# A final piece of middleware to track failed (halted) steps in the context. | |
class StepTracker < SimpleDelegator | |
# @param result [Result] | |
# @return [Result] | |
def call(result) | |
result = __getobj__.call(result) | |
return result.with_context(:halted_step, __getobj__) unless result.continue? | |
result | |
end | |
end | |
# A pipeline is a Step | |
# and can be composed into other pipelines. | |
# A pipeline's #call also collects a recursive trace of each step's position in the tree, | |
# passing it down to each step in #context[:trace]. | |
# Example: a step receiving a trace of `[1,2,1]` means the step is running as the first step of the second step of the first step. | |
# | |
# Step [1] | |
# - Step [1,1] | |
# - Step [1,2] | |
# - Step [1,2,1] | |
# | |
# @param result [Result] | |
# @return [Result] | |
def call(result) | |
trace = result.context[:trace] || [] | |
steps.each.with_index(1).reduce(result) do |res, (step, position)| | |
if res.continue? | |
step.call(res.with_context(:trace, trace + [position])) | |
else | |
res | |
end | |
end | |
end | |
def inspect | |
%(<#{self.class.name}:#{object_id} #{steps.size} steps>) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you @jspillers ! I'll definitely have a look at your library.