Skip to content

Instantly share code, notes, and snippets.

@dkubb
Last active September 2, 2024 18:04
Show Gist options
  • Save dkubb/0bad9c33802c3f14ff60cc23a8063388 to your computer and use it in GitHub Desktop.
Save dkubb/0bad9c33802c3f14ff60cc23a8063388 to your computer and use it in GitHub Desktop.
Class to process actions in parallel
# frozen_string_literal: true
require 'anima'
require 'set'
require 'thread'
# Actions class to process actions in parallel
class Actions
# Action class to store action details
class Action
include Adamantium, Anima.new(:name, :dependencies, :block)
# Execute the action
#
# @example with no results
# action = Action.new(name: :a, dependencies: [], block: -> { 1 })
# action.execute # => 1
#
# @example with results
# action = Action.new(name: :b, dependencies: [:a], block: ->(a:) { a + 1 })
# action.execute(a: 1) # => 2
#
# @param results [Hash] results of all actions
#
# @return [Object] result of the action
def execute(results)
@block.call(**results.slice(*dependencies))
end
# Return dependencies that are not yet satisfied
#
# @example with no dependencies
# action = Action.new(name: :a, dependencies: [], block: -> { 1 })
# action.needs({}) # => []
#
# @example with dependencies
# action = Action.new(name: :b, dependencies: [:a], block: ->(a:) { a + 1 })
# action.needs({}) # => [:a]
# action.needs(a: 1) # => []
# action.needs(b: 2) # => [:a]
#
# @param results [Hash] results of all actions
#
# @return [Array<Symbol>] list of dependencies that are not yet satisfied
def needs(results)
dependencies - results.keys
end
# Check if the action is ready to be processed
#
# @example with no dependencies
# action = Action.new(name: :a, dependencies: [], block: -> { 1 })
# action.ready? # => true
#
# @example with dependencies
# action = Action.new(name: :b, dependencies: [:a], block: ->(a:) { a + 1 })
# action.ready?(a: 1) # => true
# action.ready?(b: 2) # => false
#
# @param results [Hash] results of all actions
#
# @return [Boolean] true if the action is ready to be processed
def ready?(results)
needs(results).empty?
end
end
class ProcessingFinishedError < RuntimeError; end
class DuplicateActionError < ArgumentError; end
class UnknownDependencyError < ArgumentError; end
# Initialize the actions object
#
# @return [Actions] self
def initialize
@actions = {}
@dependents = Hash.new { |dependents, dependency| dependents[dependency] = Set.new }
@queue = Thread::Queue.new
@mutex = Mutex.new
@condition = ConditionVariable.new
@results = {}
end
# Add an action to the list of actions to be processed
#
# @example
# actions = Actions.new
# actions.add(:a) { 1 }
# actions.add(:b, [:a]) { |a:| a + 1 }
#
# @param name [Symbol] name of the action
# @param dependencies [Array<Symbol>] dependencies of the action
# @param block [Proc] block to execute the action
#
# @yield [results] block to execute the action
# @yieldparam [Hash] results of all dependencies
# @yieldreturn [Object] result of the action
#
# @raise [ArgumentError] if block is not given
# @raise [ProcessingFinishedError] if processing has finished
# @raise [DuplicateActionError] if the action already exists
# @raise [UnknownDependencyError] if any dependency is unknown
#
# @return [Actions] self
def add(name, dependencies = [], &block)
raise ArgumentError, 'Block is required' unless block
@mutex.synchronize do
assert_processing_not_finished
assert_unique_action(name)
assert_known_dependencies(dependencies)
action = @actions[name] = Action.new(name:, dependencies:, block:)
needs = action.needs(@results)
if needs.empty?
@queue << action
else
needs.each { |dependency| @dependents[dependency] << name }
end
end
self
end
# Process all actions in parallel
#
# @example
# actions = Actions.new
# actions.add(:a) { 1 }
# actions.add(:b, [:a]) { |a:| a + 1 }
# actions.add(:c, [:b]) { |b:| b + 1 }
# actions.process # => { a: 1, b: 2, c: 3 }
#
# @param num_threads [Integer] number of threads to use
#
# @return [Hash] results of all actions
def process(num_threads: 4)
threads = Array.new(num_threads) do
Thread.new do
Thread.current.abort_on_exception = true
process_queue
end
end
# wait for all actions to be processed
@mutex.synchronize do
@condition.wait(@mutex) until @results.size.equal?(@actions.size)
@queue.close
end
threads.each(&:join)
@results
end
private
# Process actions from the queue
#
# @return [void]
def process_queue
while action = @queue.shift
result = action.execute(@results)
@mutex.synchronize do
@results[action.name] = result
enqueue_ready_actions(action.name)
@condition.broadcast
end
end
end
# Enqueue actions that are ready to be processed
#
# @param completed_action [Symbol] name of the completed action
#
# @return [void]
def enqueue_ready_actions(completed_action)
@actions.fetch_values(*@dependents.delete(completed_action)).each do |dependent|
@queue << dependent if dependent.ready?(@results)
end
end
# Assert that processing has not finished
#
# @raise [ProcessingFinishedError] if processing has finished
#
# @return [void]
def assert_processing_not_finished
raise ProcessingFinishedError, 'Cannot add actions after processing has finished' if @queue.closed?
end
# Assert that the action is unique
#
# @param name [Symbol] name of the action
#
# @raise [DuplicateActionError] if the action already exists
#
# @return [void]
def assert_unique_action(name)
raise DuplicateActionError, "Action '#{name}' already exists" if @actions.key?(name)
end
# Assert that all dependencies are known
#
# @param dependencies [Array<Symbol>] dependencies of the action
#
# @raise [UnknownDependencyError] if any dependency is unknown
#
# @return [void]
def assert_known_dependencies(dependencies)
unknown_dependencies = dependencies - @actions.keys
unless unknown_dependencies.empty?
raise UnknownDependencyError, "Unknown dependencies: #{unknown_dependencies.join(', ')}"
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment