Last active
September 2, 2024 18:04
-
-
Save dkubb/0bad9c33802c3f14ff60cc23a8063388 to your computer and use it in GitHub Desktop.
Class to process actions in parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require 'anima' | |
require 'set' | |
require 'thread' | |
# Actions class to process actions in parallel | |
class Actions | |
# Action class to store action details | |
class Action | |
include Adamantium, Anima.new(:name, :dependencies, :block) | |
# Execute the action | |
# | |
# @example with no results | |
# action = Action.new(name: :a, dependencies: [], block: -> { 1 }) | |
# action.execute # => 1 | |
# | |
# @example with results | |
# action = Action.new(name: :b, dependencies: [:a], block: ->(a:) { a + 1 }) | |
# action.execute(a: 1) # => 2 | |
# | |
# @param results [Hash] results of all actions | |
# | |
# @return [Object] result of the action | |
def execute(results) | |
@block.call(**results.slice(*dependencies)) | |
end | |
# Return dependencies that are not yet satisfied | |
# | |
# @example with no dependencies | |
# action = Action.new(name: :a, dependencies: [], block: -> { 1 }) | |
# action.needs({}) # => [] | |
# | |
# @example with dependencies | |
# action = Action.new(name: :b, dependencies: [:a], block: ->(a:) { a + 1 }) | |
# action.needs({}) # => [:a] | |
# action.needs(a: 1) # => [] | |
# action.needs(b: 2) # => [:a] | |
# | |
# @param results [Hash] results of all actions | |
# | |
# @return [Array<Symbol>] list of dependencies that are not yet satisfied | |
def needs(results) | |
dependencies - results.keys | |
end | |
# Check if the action is ready to be processed | |
# | |
# @example with no dependencies | |
# action = Action.new(name: :a, dependencies: [], block: -> { 1 }) | |
# action.ready? # => true | |
# | |
# @example with dependencies | |
# action = Action.new(name: :b, dependencies: [:a], block: ->(a:) { a + 1 }) | |
# action.ready?(a: 1) # => true | |
# action.ready?(b: 2) # => false | |
# | |
# @param results [Hash] results of all actions | |
# | |
# @return [Boolean] true if the action is ready to be processed | |
def ready?(results) | |
needs(results).empty? | |
end | |
end | |
class ProcessingFinishedError < RuntimeError; end | |
class DuplicateActionError < ArgumentError; end | |
class UnknownDependencyError < ArgumentError; end | |
# Initialize the actions object | |
# | |
# @return [Actions] self | |
def initialize | |
@actions = {} | |
@dependents = Hash.new { |dependents, dependency| dependents[dependency] = Set.new } | |
@queue = Thread::Queue.new | |
@mutex = Mutex.new | |
@condition = ConditionVariable.new | |
@results = {} | |
end | |
# Add an action to the list of actions to be processed | |
# | |
# @example | |
# actions = Actions.new | |
# actions.add(:a) { 1 } | |
# actions.add(:b, [:a]) { |a:| a + 1 } | |
# | |
# @param name [Symbol] name of the action | |
# @param dependencies [Array<Symbol>] dependencies of the action | |
# @param block [Proc] block to execute the action | |
# | |
# @yield [results] block to execute the action | |
# @yieldparam [Hash] results of all dependencies | |
# @yieldreturn [Object] result of the action | |
# | |
# @raise [ArgumentError] if block is not given | |
# @raise [ProcessingFinishedError] if processing has finished | |
# @raise [DuplicateActionError] if the action already exists | |
# @raise [UnknownDependencyError] if any dependency is unknown | |
# | |
# @return [Actions] self | |
def add(name, dependencies = [], &block) | |
raise ArgumentError, 'Block is required' unless block | |
@mutex.synchronize do | |
assert_processing_not_finished | |
assert_unique_action(name) | |
assert_known_dependencies(dependencies) | |
action = @actions[name] = Action.new(name:, dependencies:, block:) | |
needs = action.needs(@results) | |
if needs.empty? | |
@queue << action | |
else | |
needs.each { |dependency| @dependents[dependency] << name } | |
end | |
end | |
self | |
end | |
# Process all actions in parallel | |
# | |
# @example | |
# actions = Actions.new | |
# actions.add(:a) { 1 } | |
# actions.add(:b, [:a]) { |a:| a + 1 } | |
# actions.add(:c, [:b]) { |b:| b + 1 } | |
# actions.process # => { a: 1, b: 2, c: 3 } | |
# | |
# @param num_threads [Integer] number of threads to use | |
# | |
# @return [Hash] results of all actions | |
def process(num_threads: 4) | |
threads = Array.new(num_threads) do | |
Thread.new do | |
Thread.current.abort_on_exception = true | |
process_queue | |
end | |
end | |
# wait for all actions to be processed | |
@mutex.synchronize do | |
@condition.wait(@mutex) until @results.size.equal?(@actions.size) | |
@queue.close | |
end | |
threads.each(&:join) | |
@results | |
end | |
private | |
# Process actions from the queue | |
# | |
# @return [void] | |
def process_queue | |
while action = @queue.shift | |
result = action.execute(@results) | |
@mutex.synchronize do | |
@results[action.name] = result | |
enqueue_ready_actions(action.name) | |
@condition.broadcast | |
end | |
end | |
end | |
# Enqueue actions that are ready to be processed | |
# | |
# @param completed_action [Symbol] name of the completed action | |
# | |
# @return [void] | |
def enqueue_ready_actions(completed_action) | |
@actions.fetch_values(*@dependents.delete(completed_action)).each do |dependent| | |
@queue << dependent if dependent.ready?(@results) | |
end | |
end | |
# Assert that processing has not finished | |
# | |
# @raise [ProcessingFinishedError] if processing has finished | |
# | |
# @return [void] | |
def assert_processing_not_finished | |
raise ProcessingFinishedError, 'Cannot add actions after processing has finished' if @queue.closed? | |
end | |
# Assert that the action is unique | |
# | |
# @param name [Symbol] name of the action | |
# | |
# @raise [DuplicateActionError] if the action already exists | |
# | |
# @return [void] | |
def assert_unique_action(name) | |
raise DuplicateActionError, "Action '#{name}' already exists" if @actions.key?(name) | |
end | |
# Assert that all dependencies are known | |
# | |
# @param dependencies [Array<Symbol>] dependencies of the action | |
# | |
# @raise [UnknownDependencyError] if any dependency is unknown | |
# | |
# @return [void] | |
def assert_known_dependencies(dependencies) | |
unknown_dependencies = dependencies - @actions.keys | |
unless unknown_dependencies.empty? | |
raise UnknownDependencyError, "Unknown dependencies: #{unknown_dependencies.join(', ')}" | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment