Created
December 20, 2013 11:57
-
-
Save saillinux/8053779 to your computer and use it in GitHub Desktop.
DAG Scheduler in perl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use Graph; | |
use Data::Dumper; | |
use JSON; | |
use Capture::Tiny ':all'; | |
use constant { | |
WAITING => 0, | |
RUNNING => 1, | |
DONE => 2, | |
FAIL => 3, | |
}; | |
my %nodes = ( | |
"Task1" => { | |
"action" => "curl", | |
"params" => ["http://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.quote%20where%20symbol%20in%20(%22TWTR%22%2C%22FB%22%2C%22TSLA%22%2C%22XOM%22)&format=json&diagnostics=true&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys&callback="], | |
"start_time" => 0, | |
"end_time" => 0, | |
"state" => WAITING, | |
}, | |
"Task2" => { | |
"action" => \&retrieve_stock, | |
"params" => ["TWTR", "Change"], | |
"start_time" => 0, | |
"end_time" => 0, | |
"state" => WAITING, | |
}, | |
"Task3" => { | |
"action" => \&retrieve_stock, | |
"params" => ["FB", "Change"], | |
"start_time" => 0, | |
"end_time" => 0, | |
"state" => WAITING, | |
}, | |
"Task4" => { | |
"action" => \&aggregator, | |
"params" => [], | |
"start_time" => 0, | |
"end_time" => 0, | |
"state" => WAITING, | |
}, | |
); | |
my %edges = ( | |
"Task1" => [ "Task2", "Task3" ], | |
"Task2" => [ "Task4" ], | |
"Task3" => [ "Task4" ], | |
"Task4" => [ ], | |
); | |
my $g0 = Graph->new; # there is a song called zero g love in Macross | |
# add each task to the graph as node | |
foreach my $task (keys %nodes) { | |
$g0->add_vertex($task); | |
} | |
# connect each task | |
foreach my $task (keys %edges) { | |
foreach my $dep (@{$edges{$task}}) { | |
$g0->add_edge($task, $dep); | |
} | |
} | |
print "INFO: The graph is $g0\n"; | |
validate($g0); | |
scheduler($g0); | |
# print Dumper(\%nodes); | |
exit(0); | |
sub validate { | |
my $DAG = shift; | |
unless ($DAG->is_dag) { | |
print "FATAL: This graph is not a directed and acyclic graph so exiting...\n"; | |
exit; | |
} | |
if ($DAG->is_cyclic) { | |
print "FATAL: This graph contains a cycle which forms a loop in execution.\n"; | |
exit; | |
} | |
my @heads = (); | |
my @tasks = $DAG->vertices; | |
foreach my $task ( @tasks ) { | |
my $in_degree = $DAG->in_degree($task); | |
unless ($in_degree) { | |
push @heads, $task; | |
} | |
} | |
if (@heads > 1) { | |
print "FATAL: There is more than one execution start points\n"; | |
exit; | |
} | |
} | |
sub scheduler { | |
my $DAG = shift; | |
my @ts = $DAG->topological_sort; | |
foreach my $task ( @ts ) { | |
if ( $DAG->in_degree($task) ) { | |
print "INFO: check whether predecessors of [$task] were executed successfully\n"; | |
foreach my $predecessor ( $DAG->predecessors($task) ) { | |
if ( $nodes{$predecessor}->{'state'} == FAIL ) { | |
print "FATAL: The predecessor [$predecessor] of $task was failed so exiting...\n"; | |
exit; | |
} elsif ( $nodes{$predecessor}->{'state'} == DONE ) { | |
print "INFO: The predecessor [$predecessor] of $task ran successful so continuing...\n"; | |
} else { | |
print "FATAL: something went wrong exiting...\n"; | |
exit; | |
} | |
} | |
} else { | |
print "INFO: $task is the head, starting this task now\n"; | |
} | |
my $node = $nodes{$task}; | |
print "INFO: running task [$task]\n"; | |
$node->{'state'} = RUNNING; | |
$node->{'start_time'} = time(); | |
my $action = $nodes{$task}->{'action'}; | |
my @params = @{$nodes{$task}->{'params'}}; | |
my @predecessors = $DAG->predecessors($task); | |
if ( ref $action eq 'CODE' ) { | |
$action->($task, { | |
"preds" => \@predecessors, | |
"params" => \@params, | |
}); | |
} else { | |
@$node{'stdout', 'stderr', 'exit'} = capture { | |
system $action, @params; | |
}; | |
} | |
$node->{'end_time'} = time(); | |
unless ($node->{'exit'}) { | |
$node->{'state'} = DONE; | |
} else { | |
$node->{'state'} = FAIL; | |
} | |
} | |
}; | |
sub retrieve_stock { | |
my ($self, $args) = @_; | |
my $task = $args->{"preds"}[0]; | |
my ($stock, $field) = @{$args->{"params"}}; | |
my $json = decode_json($nodes{$task}->{'stdout'}); | |
my @quotes = @{$json->{'query'}{'results'}{'quote'}}; | |
foreach my $entry ( @quotes ) { | |
if ($entry->{'symbol'} eq $stock) { | |
$nodes{$self}->{'stdout'} = $entry->{$field}; | |
$nodes{$self}->{'exit'} = 0; | |
} | |
} | |
unless (exists $nodes{$self}->{'stdout'}) { | |
$nodes{$self}->{'exit'} = 1; | |
} | |
} | |
sub aggregator { | |
my ($self, $args) = @_; | |
my %changes = (); | |
foreach my $task (@{$args->{"preds"}}) { | |
my $stock = $nodes{$task}->{'params'}[0]; | |
my $change = $nodes{$task}->{'stdout'}; | |
$changes{$stock} = $change; | |
} | |
my @sorted = sort { $changes{$b} <=> $changes{$a} } keys %changes; | |
my $winner = $sorted[0]; | |
$nodes{$self}->{'stdout'} = $winner; | |
print "OUTPUT: The winner is $winner by change $changes{$winner}\n"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment