Last active
March 6, 2023 01:29
-
-
Save s1037989/330aba846a9ba06946a81c7425c3fee5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Mojolicious::Plugin::Minion::Schedule; | |
use Mojo::Base 'Mojolicious::Plugin', -signatures; | |
use Algorithm::Cron; | |
use Minion::Job; | |
use Mojo::Date; | |
use Mojo::JSON qw(j); | |
use Mojo::Util qw(monkey_patch sha1_sum); | |
monkey_patch 'Minion::Job', | |
guardian => sub ($job, $seconds=undef) { | |
$seconds //= ($job->info->{expires}||(time+30)) - time; | |
$job->minion->guard(sha1_sum(j([$job->task, $job->info->{args}])), $seconds); | |
}; | |
has schedule => sub { {} }; | |
sub register ($self, $app, $config) { | |
$app->hook(before_command => sub { $self->_load_schedule($app, $config, @_) }); | |
$app->hook(before_server_start => sub { $self->_load_schedule($app, $config, @_) }); | |
$app->helper('schedule.repeat' => sub ($c, $job) { $self->_repeat($job) }); | |
$app->helper('schedule.register' => sub ($c) { $self->_register($app) }); | |
$app->minion->worker->add_command(ping => sub ($worker) { warn 'pong!'; $worker->minion->app->log->warn('pong!') }); | |
} | |
sub _delay ($schedule) { | |
$schedule = '* * * * * *' if $schedule eq 'secondly'; | |
$schedule = '01 * * * * *' if $schedule eq 'minutely'; | |
$schedule = '* 01 * * * *' if $schedule eq 'hourly'; | |
$schedule = '* 01 * * * *' if $schedule eq 'daily'; | |
$schedule = '* 01 * * * *' if $schedule eq 'weekly'; | |
$schedule = '* 01 * * * *' if $schedule eq 'monthly'; | |
$schedule = '* 01 * * * *' if $schedule eq 'quarterly'; | |
$schedule = '* 01 * * * *' if $schedule eq 'yearly'; | |
if (my $cron = eval { Algorithm::Cron->new(base => 'utc', crontab => join ' ', $schedule) }) { | |
return $cron->next_time(time) - time; | |
} | |
else { | |
return Mojo::Date->new($schedule)->epoch - time; | |
} | |
} | |
sub _enqueue ($self, $app, $schedule, $task, $args) { | |
my $delay = _delay($schedule); | |
my $expire = 2 * $delay - 5; | |
return unless $delay > 0 && $expire > 0; | |
$app->log->debug(sprintf 'schedule: schedule %s to run in %d seconds at %s', $task, $delay, scalar localtime $delay + time); | |
$app->minion->enqueue($task => $args => {delay => $delay, expire => $expire, queue => 'schedule', notes => {schedule => $schedule}}); | |
} | |
sub _find_next { | |
my $app = shift; | |
my ($task, $args) = ref $_[0] eq 'ARRAY' ? @{$_[0]} : ($_[0]->task, $_[0]->args); | |
my $jobs = $app->minion->jobs({tasks => [$task], states => ['inactive', 'active']}); | |
my $next = undef; | |
while ($next = $jobs->next) { j($next->{args}) eq j($args) and last } | |
return unless $next; | |
$app->log->warn(sprintf 'schedule: task %s already scheduled to run in %s seconds at %s', $task, $next->{delayed} - time, scalar localtime $next->{delayed}); | |
return $next; | |
} | |
sub _load_schedule { | |
my ($self, $app, $config, $caller) = (shift, shift, shift, shift); | |
return unless $caller->isa('Minion::Command::minion') && $_[0][0] eq 'schedule'; | |
return if keys %{$self->schedule}; | |
foreach my $task (keys %$config) { | |
foreach my $schedule (keys %{$config->{$task}}) { | |
$self->_register($app, $caller, $schedule, $task, $_) for $config->{$task}->{$schedule}->@*; | |
} | |
} | |
} | |
sub _register ($self, $app, $caller, $schedule, $task, $args) { | |
my $sha = sha1_sum($task.j($args)); | |
$app->log->debug(sprintf 'schedule: already registered task %s on schedule %s', $task, $schedule) and return if grep { $_ && j($args) eq j($_->[1]) } $self->schedule->{$task}->{$sha}; | |
$app->log->debug(sprintf 'schedule: register task %s on schedule %s', $task, $schedule); | |
$self->schedule->{$task}->{$sha} = [$schedule, $args]; | |
$self->_enqueue($app, $schedule, $task, $args) unless _find_next($app, [$task, $args]); | |
} | |
sub _repeat ($self, $job) { | |
$self->_enqueue($job->app, $job->info->{notes}->{schedule}, $job->task, $job->args) unless _find_next($job->app, $job); | |
} | |
sub _unload_schedule ($self) { $self->schedule({}) } | |
1; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Minion::Command::minion::schedule; | |
use Mojo::Base 'Minion::Command::minion::worker', -signatures; | |
has description => 'Enqueue registered schedule jobs'; | |
has usage => sub { shift->extract_usage }; | |
sub run ($self) { $self->SUPER::run(@_, '-q', 'schedule') } | |
1; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package CheckLinks::Task::CheckLinks; | |
use Mojo::Base 'Mojolicious::Plugin', -signatures; | |
use Mojo::URL; | |
sub register ($self, $app, $config) { | |
$app->minion->add_task(check_links => \&_check_links); | |
} | |
sub _check_links ($job, $url) { | |
return $job->finish(sprintf 'Previous job is still active') unless $job->guardian; | |
$job->app->log->debug(sprintf '%s %s', $job->task, $url); | |
my @results; | |
my $ua = $job->app->ua; | |
my $res = $ua->get($url)->result; | |
push @results, [$url, $res->code]; | |
for my $link ($res->dom->find('a[href]')->map(attr => 'href')->each) { | |
my $abs = Mojo::URL->new($link)->to_abs(Mojo::URL->new($url)); | |
$res = $ua->head($abs)->result; | |
push @results, [$link, $res->code]; | |
} | |
$job->finish(\@results); | |
$job->app->schedule->repeat($job); | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment