Skip to content

Instantly share code, notes, and snippets.

@gyulalaszlo
Last active August 29, 2015 14:18
Show Gist options
  • Save gyulalaszlo/e1540f1896c7e5f94608 to your computer and use it in GitHub Desktop.
Save gyulalaszlo/e1540f1896c7e5f94608 to your computer and use it in GitHub Desktop.
Example of processing a DAG in ruby
class Task
attr_reader :key, :deps, :block, :all_dependencies
# Creates a new task
def initialize key, &to_do_block
@key, @deps, @all_dependencies = key, [], []
@block = to_do_block.to_proc if to_do_block
end
# Mark the dependencies of this task
def depends *dep_task_keys
@deps = dep_task_keys
@all_dependencies = dep_task_keys.dup
end
# Debug this task
def to_s
"{ #{key} -> #{deps.join(", ")} }"
end
# Does this task have any dependencies
def has_dependencies?
deps.size != 0
end
# Run this task
def run config, input
puts "Running #{key} with config #{config.inspect} and input #{input.inspect}"
block.call( config, input ) if block
end
# Removes all the dependencies in the array
# +deps_to_remove+ from the dependencies of this task
def remove_dependencies deps_to_remove
deps.reject! { |dep_name| deps_to_remove.include?(dep_name) }
end
end
# Process the given list of tasks with the provided configuration.
def process( task_list, config={} )
task_list = task_list.dup
# The storage for the output of the tasks
task_output = {}
while true
# Do not process empty lists
return if task_list.empty?
# figure out the tasks we can work on (the ones without
# dependencies)
tasks_without_dependencies = task_list.reject(&:has_dependencies?)
# Stop processing if we havent completed any tasks.
# This means the task graph is not properly constructed
# and contains cycles.
return if tasks_without_dependencies.empty?
# Collect all the inputs from the dependencies of the task
tasks_without_dependencies.each do |task|
# get all the inputs from the depndencies
task_inputs = task_output.select {|k,v| task.all_dependencies.include? k }
# run the task and save its output
task_output[task.key] = task.run( config.fetch(task.key, {}), task_inputs )
end
# Get the names of the currently completed tasks
task_names_completed = tasks_without_dependencies.map(&:key)
# Remove completed tasks from the input list
task_list.reject! {|t| task_names_completed.include?(t.key) }
# Remove any dependencies resolved in the current step
task_list.each do |task|
task.remove_dependencies( task_names_completed )
end
end
end
# Shortcut for defining a task
def task name, &block
Task.new(name, &block)
end
sql1 = task :sql1 do |config, input|
puts "Hello from task sql 1 with #{input.inspect}"
[ 1,2,3,4,5 ]
end
sql2 = task :sql2 do |config, input|
[9,10,11,12]
end
sql3 = task :sql3
proc_sql1 = task :proc_sql1 do |config, input|
puts "got from sql1: #{input[:sql1].inspect} -- and config #{config.inspect}"
input[:sql1] + [ "more"]
end
view1 = task :view1
view2 = task :view2
view3 = task :view3
delivery = task :delivery
proc_sql1.depends( :sql1 )
view1.depends( :proc_sql1, :sql2 )
view2.depends( :sql3 )
view3.depends( :view1, :view2 )
delivery.depends( :view3 )
task_list = [ view2, view3, delivery, sql1, sql2, sql3, proc_sql1, view1, ]
process( task_list,{
sql1: {
username: "l",
password: "lollol"
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment