Skip to content

Instantly share code, notes, and snippets.

@mmterpstra
Last active July 30, 2020 01:08
Show Gist options
  • Save mmterpstra/830f66c8bf6ab77ff3c1e000beb42ecb to your computer and use it in GitHub Desktop.
Save mmterpstra/830f66c8bf6ab77ff3c1e000beb42ecb to your computer and use it in GitHub Desktop.
submitd.pl queue crawlin since 2019
#!/usr/bin/perl
use strict;
use warnings;
use Proc::Daemon;
use Proc::PID::File;
use Getopt::Long;
use Log::Log4perl qw(:easy);
use File::Basename;
use Data::Dumper;
our $logger;
main();
sub main {
#use Proc::Daemon;
my $subcommand = shift @ARGV || "";
#cli processing
my $opts = {
'use' => "Use:$0 [start|status|kill|restart] options
Options
--verbose toggles verbosity
--workdir PATH changes workdir for deamon
--basename STRING changes basename for deamon
--watchlist FILE list of submit.sh scripts to watch
--ctype slurm change the clustertype for now
only slurm support
--retry INT change the retry times
--unfinished STR string to match files in submit dir to
test for unfinished jobs
--tofinish STR string to match files in submit dir to
test for jobs to finish
--finished STR string to match files in submit dir to
test for finished submit.sh. blocks
if finished>=tofinish",
'get' => "",#populated below
'v' => 0,
'wd' => './',
'wl' => 'submit.list',
'base' => $0,
'retry' => 3,
'ctype' => 'slurm',
'finished' => 'Cleanup_*.finished',
'tofinish' => 'Cleanup_*.sh',
'unfinished' => '*.started',
};
$opts -> {'get'} = {"verbose" => \$opts -> {'v'},
"workdir=s" => \$opts -> {'wd'},
"basename=s" => \$opts -> {'base'},
"watchlist=s" => \$opts -> {'wl'},
"ctype=s" => \$opts -> {'ctype'},
"retry=s" => \$opts -> {'retry'},
"unfinished=s" => \$opts -> {'unfinished'},
"tofinish=s" => \$opts -> {'tofinish'},
"finished=s" => \$opts -> {'finished'}
};
GetOptions (%{$opts -> {'get'}})
or die("Error in command line arguments\n");
#start logging stuff
my $logLevel = $INFO;
$logLevel = $DEBUG if ($opts -> {'v'});
Log::Log4perl->easy_init(
{
level => $logLevel,
file => "STDERR",
layout => '%d L:%L %p> %m%n'
},
);
$logger = Log::Log4perl::get_logger();
#init factors
my $daemon = Proc::Daemon->new(
work_dir => $opts -> {'wd'},
child_STDOUT => ">>".$opts -> {'base'}."_out.txt",
child_STDERR => ">>".$opts -> {'base'}."_err.txt",
pid_file => $opts -> {'wd'}."/".$opts -> {'base'}.".pid"
);
if($subcommand eq "start"){
if (Proc::PID::File -> running(dir=>$opts -> {'wd'},name=>$opts -> {'basename'} )) {
$logger->info( "$0: Worker already running!!".localtime(time()));
exit(0);
}
my $ChildPID = $daemon->Init;
unless ( $ChildPID ) {
# code executed only by the child ...
$logger->info("Submit Daemon Checking...");
my $deamondata = {};
while(1){
$logger->info("Submit Daemon work");
$deamondata = SubmitDaemonWork($deamondata,$opts);
$logger->info("Submit Daemon Sleep");
sleep(3600);#should be once every hour?;
}
$logger->info("Submit Daemon run Ended..");
}
#$Kid_2_PID = $daemon->Init( {
# work_dir => '/data/umcg-mterpstra/umcg-oncogenetics/git/molgenis-c5-TumorNormal',
# exec_command => 'perl /home/my_script.pl',
# } );
#warn "$0: Succes ";
#sleep(20);
#warn "$0: Ended ";
}elsif($subcommand eq "status"){
my $pid = $daemon->Status();
if($pid){
$logger->info( "Running with $pid");
}else {
$logger->error("Not running with any pid. Return code: $pid");
}
}elsif($subcommand eq "kill"){
my $stopped = $daemon->Kill_Daemon();
if($stopped){
$logger->info("sucessfully killed");
}else{
$logger->error("failed killing worker return value:$stopped");
}
}elsif($subcommand eq "restart"){
#just kill and restart
my $stopped = $daemon->Kill_Daemon();
if($stopped){
$logger->info("sucessfully killed");
}else{
$logger->info("failed killing worker return value:$stopped");
}
sleep(1);
if (Proc::PID::File -> running(dir=>$opts -> {'wd'},name=>$opts -> {'basename'} )) {
$logger->info( "$0: Worker already running!!".localtime(time()));
exit(0);
}
my $ChildPID = $daemon->Init;
unless ( $ChildPID ) {
# code executed only by the child ...
$logger->info("Submit Daemon Checking...");
my $deamondata = {};
while(1){
$deamondata = SubmitDaemonWork($deamondata,$opts);
}
$logger->info("Submit Daemon Sleep");
sleep(3600);#should be once every hour?;
$logger->info("Submit Daemon run Ended..");
}
#$Kid_2_PID = $daemon->Init( {
# work_dir => '/data/umcg-mterpstra/umcg-oncogenetics/git/molgenis-c5-TumorNormal',
# exec_command => 'perl /home/my_script.pl',
# } );
#warn "$0: Succes ";
#sleep(20);
#warn "$0: Ended ";
}else{
$logger->info("invalid subcommand\n" . $opts -> {'use'});
}
}
sub GetClusterConfig {
my $self = shift(@_);# or $logger->error("ClusterConfig sub error");
my $opts = shift(@_);# or $logger->error("ClusterConfig sub error");
$self = {
'queue' => 'regular',
'ctype' => $opts -> {'ctype'},
'slurm' => {
'jobsubmitcmd' => 'sbatch',
'queuestatuscmd' => 'squeue -u $USER -o "%i|%P|%q|%j|%u|%t|%M|%R|%S|%p|%c|%m"',
'queuestatusconversion' => { #covert be like JOBID|PARTITION|QOS|NAME|USER|ST|TIME|NODELIST(REASON)|START_TIME|PRIORITY|MIN_CPUS|MIN_MEMORY
'NAME' => 'name',
'JOBID' => 'id',
'PARTITION' => 'partition',
'QOS' => 'qos',
'STATUS' => 'status',
'USER' => 'user',
'ST' => 'status',
'TIME' => 'runtime',
'NODELIST(REASON)' => 'nodeOrReason',
'START_TIME' => 'starttime',
'PRIORITY' => 'prio',
'MIN_CPUS' => 'jobcpu',
'MIN_MEMORY' => 'jobmem',
},
'runsubmitcmd' => 'set -e && set -x && set -o pipefail && bash ',
'queuelimitscmd' => 'sacctmgr show qos format=Name%15,Priority,UsageFactor,GrpTRES%30,GrpSubmit,MaxTRESPerUser%30,MaxSubmitJobsPerUser,Preempt%45,MaxWallDurationPerJob',
},
};
return $self;
};
sub GetProjectsSubmit {
my $submit = shift @_ or $logger->fatal("no input ".@_);;
my $projects;
my $taskname = undef;
$logger->debug("reading submit.sh '".$submit."'");
open(my $subin,'<', $submit ) or $logger->fatal("Read error of '".$submit."'!!!\n $!");
while(not(defined($taskname)) and my $line = <$subin>){
chomp $line;
#$logger->debug("Processing line '$line'");
next if (not($line =~ m/\s*tname=\"\w+"/));
$logger->debug("Processing line '$line'");
my $taskdir = dirname($submit);
(undef,$taskname,undef)=split('"',$line);
my $taskbase = substr($taskname,0,length($taskname)-1);
$logger->debug("parsing $taskname matched with $taskdir/$taskbase*.sh");
for my $task (<"$taskdir/$taskbase*.sh">){
my $project;
open(my $taskin,'<', $task ) or $logger->fatal("Read error of '".$task."'!!!\n $!");
$logger->debug("Processing file '$task'");
while(not(defined($project)) and my $line = <$taskin>){
if($line =~ m/^project=/){
$logger->debug("protocols match on '$line'");
(undef,$project,undef) = split('"',$line);
$logger->debug("Processing line '$line'");
push(@{$projects},$project);
$logger->debug("Found $project in $task");
}
}
}
$logger->debug("parsing $_");
}
return $projects;
}
sub GetWatchlistConfig {
my $self = shift(@_);
my $opts = shift(@_);
$logger->debug("reading watchlist ".$opts -> {'wl'});
open(my $wl,'<', $opts -> {'wl'} ) or $logger->fatal("Read error of '".$opts -> {'wl'}."'!!!\n $!");
while(<$wl>){
chomp;
push @{$self -> {'watchlist'}},{'submit' => $_,'projects' => GetProjectsSubmit($_),'retry' => $opts -> {'retry'}};
}
return $self;
}
sub UpdateWatchlistStatus{
my $self = shift @_ or ($logger->fatal("invalid deamondata input") && die "");
$self = UpdateQueueStatus($self);
$self = UpdateSubmitStatus($self);
return $self;
}
sub SubmitNotFinishedAndRemoveFinished {
my $self = shift @_ or ($logger->fatal("invalid deamondata input") && die "");
my $opts = shift @_ or ($logger->fatal("invalid opts input") && die "");
#resubmit when a jobs are inactive and a round of retry is still available and
# dirname(submit.sh) finished >= dirname(submit.sh) tofinish
my @watchListNew;
for my $submit (@{$self -> {'watchlist'}}){
my $submitdir = dirname($submit -> {'submit'});
my $tofinish = $opts -> {'tofinish'};
my $finished = $opts -> {'finished'};
my @tofinishlist=<"$submitdir/$tofinish">;
my @finishedlist=<"$submitdir/$finished">;
my $tofinishcount = scalar(@tofinishlist);
my $finishedcount = scalar(@finishedlist);
if($submit -> {'activejobcount'} == 0 &&
$submit -> {'retry'}){
#
#$logger->debug("results '".$tofinishcount."'"." '".$finishedcount."'");
if($finishedcount < $tofinishcount){
$submit -> {'retry'}--;
my $submitcmd = $self -> {$self -> {'ctype'}} -> {'runsubmitcmd'}.
$submit -> {'submit'}.'&>/dev/stdout | tail -n 50 ';
$logger->info("Resubmitting '".$submit -> {'submit'}."'");
my $ret;
@{$ret} = CmdRunnerNoBreak($submitcmd);
$logger->info("Submit results:\n\t'".join("\t",@{$ret})."'");
}else{
$logger->info("Submit not needed anymore :\n\t'".Dumper($submit)."'");
}
}
#do not add to watchlist if finished
push(@watchListNew,$submit)if($finishedcount < $tofinishcount);
}
@{$self -> {'watchlist'}} = @watchListNew;
return $self;
}
sub UpdateSubmitStatus{
my $self = shift @_ or ($logger->fatal("invalid deamondata input") && die "");
for my $submit (@{$self -> {'watchlist'}}){
my $count = 0;
for my $job (@{$self -> {'queuestatus'}}){
for my $project (@{$submit -> {'projects'}}){
$count++ if(index($job -> {'name'},$project) == 0);
}
}
$submit -> {'activejobcount'} = $count;
}
return $self;
}
sub UpdateQueueStatus {
my $self = shift @_ or ($logger->fatal("invalid deamondata input") && die "");
my $queueres = [];
#catch for failing if something times out
while(not(defined($queueres -> [0]) and $queueres -> [0] =~ m/NAME/)){
@{$queueres} = CmdRunner($self -> { $self -> {'ctype'} } -> {'queuestatuscmd'});
}
chomp($queueres -> [0]);
my $queueheader;@{$queueheader} = split('\|',shift(@{$queueres}));
for my $queuerow (@{$queueres}){
my $queueinfo;
chomp $queuerow;
my @vals = split('\|',$queuerow);
my $i = 0;
#aaargh this does a map to crate a json like object with header name conversion in the fly.
map{ my $val = $_;
if(defined($self -> {$self -> {'ctype'}} -> {'queuestatusconversion'} -> {$queueheader -> [$i]})){
$queueinfo -> {$self -> {$self -> {'ctype'}} -> {'queuestatusconversion'} -> {$queueheader -> [$i]}} = $val;
}else{
$queueinfo -> {'!!'.$queueheader -> [$i]} = $val;
};
$i++;}(@vals);
push(@{$self -> {'queuestatus'}}, $queueinfo);
}
#$logger->fatal("queue results ".Dumper($queueres));
return $self;
}
sub CmdRunner {
my $ret;
my $cmd = join(" ",@_);
$logger->debug(" system call:'". $cmd."'");
@{$ret} = `($cmd )2>&1`;
if ($? == -1) {
$logger->fatal("failed to execute: $!\n");
}elsif ($? & 127) {
$logger->fatal( sprintf "child died with signal %d, %s coredump",
($? & 127), ($? & 128) ? 'with' : 'without');
}elsif ($? != 0) {
$logger->fatal( sprintf "child died with signal %d, %s coredump",
($? & 127), ($? & 128) ? 'with' : 'without');
}else {
$logger->debug( sprintf "child exited with value %d\n", $? >> 8 );
# warn localtime( time() ). " [INFO] " . sprintf "child exited with value %d\n", $? >> 8;
}
return @{$ret};
}
sub CmdRunnerNoBreak {
my $ret;
my $cmd = join(" ",@_);
$logger->debug(" system call:'". $cmd."'");
@{$ret} = `($cmd )2>&1`;
if ($? == -1) {
$logger->debug("failed to execute: $!\n");
}elsif ($? & 127) {
$logger->debug( sprintf "child died with signal %d, %s coredump",
($? & 127), ($? & 128) ? 'with' : 'without');
}elsif ($? != 0) {
$logger->debug( sprintf "child died with signal %d, %s coredump",
($? & 127), ($? & 128) ? 'with' : 'without');
}else {
$logger->debug( sprintf "child exited with value %d\n", $? >> 8 );
# warn localtime( time() ). " [INFO] " . sprintf "child exited with value %d\n", $? >> 8;
}
return @{$ret};
}
sub SubmitDaemonWork {
my $self = shift @_ or ($logger->fatal("invalid deamondata input") && die "");
my $opts = shift @_ or ($logger->fatal("invalid option input") && die "");
$self = GetClusterConfig($self,$opts) if(not(defined($self -> {'ctype'})));
$self = GetWatchlistConfig($self,$opts) if(not(defined($self -> {'watchlist'})));
$logger->debug("config dump after GetWatchlistConfig: " . Dumper($self));
$self = UpdateWatchlistStatus($self);
$logger->debug("config dump after UpdateJobStatus: " . Dumper($self));
$self = SubmitNotFinishedAndRemoveFinished($self,$opts);
return $self;
}
sub main_old {
# Daemonize
Proc::Daemon::Init(
work_dir => '/data/umcg-mterpstra/umcg-oncogenetics/git/molgenis-c5-TumorNormal',
child_STDOUT => '>>log.txt',
child_STDERR => '>>debug.txt');
# If already running, then exit
if (Proc::PID::File->running()) {
warn "$0: already running at ".localtime(time());
exit(0);
}
# Perform initializes here
warn "$0: init at ".localtime(time());
# Enter loop to do work
while(1) {
# Do whatcha gotta do
warn "$0: run stuff at ".localtime(time());
sleep(2);
exit 0;
}
}

originally posted on https://gist.github.com/mmterpstra/830f66c8bf6ab77ff3c1e000beb42ecb

Description

submitd.pl a project based re-submission daemon for molgenis-compute when working on slurm clusters.

todo

Maybe hack in different sheduler support, nicer logging, testing on calculon and mailing on project Restart/finish once a day??

use

Use:submitd.pl [start|status|kill|restart] options
	Options
		--verbose	toggles verbosity
		--workdir PATH	changes workdir for deamon
		--basename STRING	changes basename for deamon
		--watchlist FILE	list of submit.sh scripts to watch
		--ctype slurm		change the clustertype for now
					only slurm support
		--retry INT		change the retry times
		--unfinished STR	string to match files in submit dir to 
					test for unfinished jobs
		--tofinish STR		string to match files in submit dir to 
					test for jobs to finish
		--finished STR		string to match files in submit dir to 
					test for finished submit.sh.  blocks 
					if finished>=tofinish

example

Basic starting the daemon with debug messages

#load required modules
ml Proc-Daemon/0.23-foss-2016a-Perl-5.20.2-bare \
 Proc-PID-File/1.29-foss-2016a-Perl-5.20.2-bare \
 Log-Log4perl/1.47-foss-2016a
#optionally verbose te terminal
tail -f submitd.pl_*txt
#run the script
perl submitd.pl start \
  --watchlist submit.list \
  --tofinish Cleanup_*.sh \
  --finished Cleanup_*.finished
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment