Last active
March 11, 2019 13:24
-
-
Save ggl/326d6d01fdae2c91be2d19cb7f26dbf5 to your computer and use it in GitHub Desktop.
Mojo::IOLoop::Subproces task runner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
use strict; | |
use warnings; | |
use Data::Dumper; | |
use Mojo::Log; | |
use Mojo::IOLoop; | |
use Mojo::IOLoop::Subprocess; | |
my %procs; | |
my %tasks; | |
my %pending; | |
my @queue = qw(1 2 3 4 5 6 7 8 9); | |
my $max_child = 3; | |
my $poll_cycle = 10; | |
my $log = Mojo::Log->new(handle => *STDERR, level => 'debug')->info("Worker started, pid: $$"); | |
Mojo::IOLoop->recurring($poll_cycle => sub { | |
my $loop = shift; | |
$loop->delay( | |
sub { | |
my $delay = shift; | |
my $active_pids = scalar keys %procs; | |
$log->info('Tasks queued: '.(scalar(@queue) - $active_pids).', active: '.$active_pids.'/'.$max_child); | |
my $i = 0; | |
foreach my $task (@queue) { | |
## skip pending tasks | |
next if $pending{$task}; | |
## skip active tasks | |
next if $tasks{$task}; | |
my $pids = scalar keys %procs; | |
if (($pids < $max_child) and ($i < $max_child)) { | |
## run queued tasks in order | |
## guard against running queued tasks multiple times when max_child >= poll_cycle | |
## by saving pending tasks in a hash and clearing them once they become active | |
$pending{$task} ||= time; | |
Mojo::IOLoop->timer(++$i => sub { | |
$pids = scalar keys %procs; | |
if ($pids < $max_child) { | |
$log->info("New task: $task"); | |
my $proc = Mojo::IOLoop::Subprocess->new; | |
$proc->run(sub { | |
my $sp = shift; | |
$log->debug('Spawned child pid: '.$sp->pid.', $$: '.$$); | |
$sp->progress($task); | |
eval { | |
sleep(int(rand(30))); | |
}; | |
if ($@) { | |
$log->error("Task $task has failed", $@); | |
}; | |
return $task; | |
}, | |
sub { | |
my ($sp, $err, $task) = @_; | |
$log->debug('Parent pid: '.$sp->pid.', $$: '.$$); | |
if ($err) { | |
$log->error($err); | |
}; | |
if ((my $pid = $sp->pid) and $task) { | |
kill(9, $pid); | |
if (!kill(1, $pid)) { | |
delete $procs{$pid}; | |
delete $tasks{$task}; | |
delete $pending{$task}; | |
$log->info("Task ".$task." done."); | |
@queue = grep { $_ != $task } @queue; | |
}; | |
}; | |
return 1; | |
}); | |
$proc->on(progress => sub { | |
my ($sp, $task) = @_; | |
$log->debug('pid: '.$sp->pid.', task: '.$task); | |
$procs{$sp->pid} = $sp; | |
$tasks{$task} = $sp->pid; | |
}); | |
$log->debug(Dumper [keys %procs], \%tasks); | |
if (my $pid = $proc->pid) { | |
$procs{$pid} = $proc; | |
$tasks{$task} = $pid; | |
}; | |
} | |
else { | |
delete $pending{$task}; | |
}; | |
}); | |
}; | |
}; | |
}, | |
); | |
}); | |
## start event loop | |
Mojo::IOLoop->start unless Mojo::IOLoop->is_running; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment