Last active
August 29, 2015 14:18
-
-
Save gyulalaszlo/e1540f1896c7e5f94608 to your computer and use it in GitHub Desktop.
Example of processing a DAG in ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Task | |
attr_reader :key, :deps, :block, :all_dependencies | |
# Creates a new task | |
def initialize key, &to_do_block | |
@key, @deps, @all_dependencies = key, [], [] | |
@block = to_do_block.to_proc if to_do_block | |
end | |
# Mark the dependencies of this task | |
def depends *dep_task_keys | |
@deps = dep_task_keys | |
@all_dependencies = dep_task_keys.dup | |
end | |
# Debug this task | |
def to_s | |
"{ #{key} -> #{deps.join(", ")} }" | |
end | |
# Does this task have any dependencies | |
def has_dependencies? | |
deps.size != 0 | |
end | |
# Run this task | |
def run config, input | |
puts "Running #{key} with config #{config.inspect} and input #{input.inspect}" | |
block.call( config, input ) if block | |
end | |
# Removes all the dependencies in the array | |
# +deps_to_remove+ from the dependencies of this task | |
def remove_dependencies deps_to_remove | |
deps.reject! { |dep_name| deps_to_remove.include?(dep_name) } | |
end | |
end | |
# Process the given list of tasks with the provided configuration. | |
def process( task_list, config={} ) | |
task_list = task_list.dup | |
# The storage for the output of the tasks | |
task_output = {} | |
while true | |
# Do not process empty lists | |
return if task_list.empty? | |
# figure out the tasks we can work on (the ones without | |
# dependencies) | |
tasks_without_dependencies = task_list.reject(&:has_dependencies?) | |
# Stop processing if we havent completed any tasks. | |
# This means the task graph is not properly constructed | |
# and contains cycles. | |
return if tasks_without_dependencies.empty? | |
# Collect all the inputs from the dependencies of the task | |
tasks_without_dependencies.each do |task| | |
# get all the inputs from the depndencies | |
task_inputs = task_output.select {|k,v| task.all_dependencies.include? k } | |
# run the task and save its output | |
task_output[task.key] = task.run( config.fetch(task.key, {}), task_inputs ) | |
end | |
# Get the names of the currently completed tasks | |
task_names_completed = tasks_without_dependencies.map(&:key) | |
# Remove completed tasks from the input list | |
task_list.reject! {|t| task_names_completed.include?(t.key) } | |
# Remove any dependencies resolved in the current step | |
task_list.each do |task| | |
task.remove_dependencies( task_names_completed ) | |
end | |
end | |
end | |
# Shortcut for defining a task | |
def task name, &block | |
Task.new(name, &block) | |
end | |
sql1 = task :sql1 do |config, input| | |
puts "Hello from task sql 1 with #{input.inspect}" | |
[ 1,2,3,4,5 ] | |
end | |
sql2 = task :sql2 do |config, input| | |
[9,10,11,12] | |
end | |
sql3 = task :sql3 | |
proc_sql1 = task :proc_sql1 do |config, input| | |
puts "got from sql1: #{input[:sql1].inspect} -- and config #{config.inspect}" | |
input[:sql1] + [ "more"] | |
end | |
view1 = task :view1 | |
view2 = task :view2 | |
view3 = task :view3 | |
delivery = task :delivery | |
proc_sql1.depends( :sql1 ) | |
view1.depends( :proc_sql1, :sql2 ) | |
view2.depends( :sql3 ) | |
view3.depends( :view1, :view2 ) | |
delivery.depends( :view3 ) | |
task_list = [ view2, view3, delivery, sql1, sql2, sql3, proc_sql1, view1, ] | |
process( task_list,{ | |
sql1: { | |
username: "l", | |
password: "lollol" | |
} | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment