Created
January 5, 2018 18:17
-
-
Save davisjam/42d80b4e33d9206e13cb54dffea16480 to your computer and use it in GitHub Desktop.
Distribute an embarrassingly-parallel workload across a worker cluster. Dependency-free!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env perl | |
# Author: Jamie Davis <[email protected]> | |
# Description: | |
# - Perform a bunch of operations, using a cluster of worker nodes. | |
# - To cleanly exit early, check the first few lines of stderr for a file to touch. | |
# Dependencies. | |
use strict; | |
use warnings; | |
use threads; | |
use threads::shared; | |
use JSON::PP; | |
use Getopt::Long; | |
use Time::HiRes qw( usleep ); | |
use Carp; | |
# Globals. | |
my %globals; | |
my $LOG_LOCK : shared; # Logging. | |
my $WORKERS_DONE : shared; # Worker management. | |
$WORKERS_DONE = 0; | |
my $TASK_LOCK : shared; # Getting and delivering tasks. | |
my $tasksLoaded = 0; | |
my $TASKS : shared; # array ref | |
my $RESULT_LOCK : shared; # Emitting results. | |
my $RESULT_FH; | |
my $resultFHOpened : shared; | |
$resultFHOpened = 0; | |
my $exitEarlyFile = "/tmp/distributed-work-exit-early-pid$$"; # External signaling. | |
unlink $exitEarlyFile; | |
my $EXIT_EARLY : shared; | |
$EXIT_EARLY = 0; | |
my $NO_TASKS_LEFT = -1; | |
# Process args. | |
my $invocation = "$0 " . join(" ", @ARGV); | |
my %args; | |
GetOptions(\%args, | |
"cluster=s", | |
"workScript=s", | |
"taskFile=s", | |
"resultFile=s", | |
"workers=s@", | |
"notWorkers=s@", | |
"unusedCores=i", | |
"copy=s", | |
"PATHprefix=s", | |
"verbose", | |
"help", | |
) or die "Error parsing args\n"; | |
%globals = &processArgs(%args); | |
&log("Invocation: $invocation"); | |
my @workerNames = map { $_->{host} } @{$globals{cluster}}; | |
&log(scalar(@{$globals{cluster}}) . " workers: <@workerNames>"); | |
# Load tasks. | |
&log("Loading tasks from $globals{taskFile}"); | |
$TASKS = shared_clone([&loadTasks("taskFile"=>$globals{taskFile})]); | |
&log("Opening resultFile $globals{resultFile}"); | |
$RESULT_FH = &openResultFile($globals{resultFile}); | |
# Start and await workers. | |
&log("Starting workers"); | |
&log("To cleanly exit early, run the following command:\n touch $exitEarlyFile"); | |
&runWorkers("cluster"=>$globals{cluster}); | |
&log("See results in $globals{resultFile}"); | |
&noMoreResults(); | |
# Write out any remaining tasks. | |
if ($EXIT_EARLY) { | |
&log("Exited early, writing out any remaining tasks"); | |
&writeOutRemainingTasks(); | |
} | |
exit(0); | |
# Launch and await workers on all workers | |
# | |
# input: %args keys: cluster | |
sub runWorkers { | |
my (%args) = @_; | |
&assertUsage("runWorkers: Error, usage: hash with keys: cluster", $args{cluster}); | |
# Propagate copy | |
if ($globals{copy_src} and $globals{copy_dest}) { | |
&log("runWorkers: Propagating copy"); | |
&propagateCopyDir("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$args{cluster}); | |
} | |
# Start workers | |
&log("runWorkers: Starting workers"); | |
my @my_threads; | |
my $thread_exitEarly; | |
for my $worker (@{$args{cluster}}) { | |
my $unusedCores = &min($globals{unusedCores}, $worker->{logicalCores}); | |
if ($worker->{unusedCores}) { # Override global recommendation? | |
$unusedCores = $worker->{unusedCores}; | |
} | |
my $usedCores = $worker->{logicalCores} - $unusedCores; | |
&log("runWorkers: Starting $usedCores processes on worker $worker->{host}, leaving $unusedCores unused cores"); | |
for my $core (1 .. $usedCores) { | |
my %workerWithID = %{$worker}; | |
$workerWithID{id} = $core; | |
&log("runWorkers: $workerWithID{host}:$workerWithID{id}"); | |
push @my_threads, threads->create(\&thread_worker, \%workerWithID); | |
} | |
} | |
$thread_exitEarly = threads->create(\&thread_exitEarly, ($exitEarlyFile)); | |
&log("runWorkers: Waiting on my " . scalar(@my_threads) . " threads"); | |
for my $thr (@my_threads) { | |
&log("runWorkers: thread finished!"); | |
$thr->join(); | |
} | |
# If not already done, signal $thread_exitEarly. | |
{ lock($WORKERS_DONE); | |
$WORKERS_DONE = 1; | |
} | |
&log("runWorkers: Waiting on thread_exitEarly"); | |
$thread_exitEarly->join(); | |
&log("runWorkers: Done"); | |
return; | |
} | |
sub getRemainingTasks { | |
# Extract all remaining tasks. | |
my @remainingTasks; | |
while (1) { | |
my $t = &getNextTask(); | |
last if ($t eq $NO_TASKS_LEFT); | |
push @remainingTasks, $t; | |
} | |
my @strings = map { &task_toString($_) } @remainingTasks; | |
&log("getRemainingTasks: Got " . scalar(@remainingTasks) . " remaining tasks: <@strings>"); | |
return @remainingTasks; | |
} | |
# input: () | |
# output: ($anyRemainingTasks) | |
sub writeOutRemainingTasks { | |
# A file to write to. | |
my $remainingWorkFile = "/tmp/distributed-work-remainingWork-pid$$\.txt"; | |
unlink $remainingWorkFile; | |
my @remainingTasks = &getRemainingTasks(); | |
if (@remainingTasks) { | |
# Convert back to JSON and write out. | |
my @lines = map { encode_json($_->{task}) } @remainingTasks; | |
my $contents = join("\n", @lines); | |
&writeToFile("file"=>$remainingWorkFile, "contents"=>$contents); | |
&log(scalar(@remainingTasks) . " tasks remaining, see $remainingWorkFile"); | |
} | |
return (0 < scalar(@remainingTasks)); | |
} | |
# Thread. | |
# Get and do tasks until none remain, then return. | |
# | |
# input: ($worker) worker from &getClusterInfo, with extra key 'id' | |
# output: () | |
sub thread_worker { | |
my ($worker) = @_; | |
my $tid = threads->tid(); | |
my $workerStr = "$worker->{host}:$worker->{id}"; | |
my $logPref = $workerStr; | |
for (my $taskNum = 0; ; $taskNum++) { | |
# Should we EXIT_EARLY? | |
{ | |
lock($EXIT_EARLY); | |
if ($EXIT_EARLY) { | |
&log("$logPref: exiting early"); | |
return; | |
} | |
} | |
# Get and complete a task. | |
my $task = &getNextTask(); | |
if ($task eq $NO_TASKS_LEFT) { | |
&log("$logPref: No tasks left"); | |
last; | |
} | |
&log("$logPref: task " . scalar(&task_toString($task))); | |
my $out = &work("worker"=>$worker, "task"=>$task); | |
&log("$logPref: completed task $task->{id}. Output: $out"); | |
my $result = { "task"=>$task, "worker"=>$workerStr, "output"=>$out }; | |
&emitResult($result); | |
} | |
return; | |
} | |
# input: %args: keys: src dest worker | |
# output: $dest | |
# | |
# Transfer the specified src | |
sub transferFile { | |
my %args = @_; | |
&assertUsage("transferFile: usage: src dest worker", $args{src}, $args{dest}, $args{worker}); | |
my ($out, $rc) = &cmd("scp -P$args{worker}->{port} $args{src} $args{worker}->{user}\@$args{worker}->{host}:$args{dest}"); | |
return $args{dest}; | |
} | |
# input: %args: keys: file contents | |
# output: $file | |
sub writeToFile { | |
my %args = @_; | |
&assertUsage("writeToFile: usage: file contents", $args{file}, $args{contents}); | |
open(my $fh, '>', $args{file}); | |
print $fh $args{contents}; | |
close $fh; | |
return $args{file}; | |
} | |
# input: (%args) keys: worker task | |
# worker entry from &getClusterInfo | |
# task created by &createTask | |
# output: ($out) | |
sub work { | |
my %args = @_; | |
&assertUsage("work: usage: worker task", $args{worker}, $args{task}); | |
my $tid = threads->tid(); | |
# Create task file. | |
my $localTaskFile = "/tmp/parallel-process-repo-$$\_$tid-LOCAL.json"; | |
my $remoteTaskFile = "/tmp/parallel-process-repo-$$\_$tid-REMOTE.json"; | |
&writeToFile("file"=>$localTaskFile, "contents"=>encode_json($args{task}->{task})); | |
&transferFile("src"=>$localTaskFile, "dest"=>$remoteTaskFile, "worker"=>$args{worker}); | |
unlink $localTaskFile; | |
# Process task remotely and log output. | |
my $PATHprefix = $globals{PATHprefix} ? "PATH=$globals{PATHprefix}:\$PATH" : ""; | |
my $remoteCmd = "$PATHprefix $globals{workScript} $remoteTaskFile 2>/dev/null; rm $remoteTaskFile"; | |
my $out = &remoteCommand("accessCreds"=>$args{worker}, "command"=>$remoteCmd); | |
unlink $remoteTaskFile; | |
return $out; | |
} | |
# Thread. | |
# Forever: Check whether we should exit early. | |
# If so, set $EXIT_EARLY and then return. | |
# Otherwise, if workers have finished ($WORKERS_DONE), return. | |
# | |
# input: ($exitEarlyFile) | |
# output: () | |
sub thread_exitEarly { | |
my ($exitEarlyFile) = @_; | |
while (1) { | |
# Exit early? | |
if (-f $exitEarlyFile) { | |
{ lock($EXIT_EARLY); | |
$EXIT_EARLY = 1; | |
} | |
last; | |
} | |
# Workers done? | |
my $done; | |
{ lock($WORKERS_DONE); | |
$done = $WORKERS_DONE; | |
} | |
if ($done) { | |
last; | |
} | |
usleep(100*1000); # 100 ms | |
} | |
} | |
# input: ($clusterFile) | |
# output: @cluster: list of node objects with keys: host user port logicalCores | |
sub getClusterInfo { | |
my ($clusterFile) = @_; | |
&assertUsage("getClusterInfo: Error, usage: (clusterFile)", $clusterFile); | |
if (not -f $clusterFile or $clusterFile !~ m/\.json$/i) { | |
die "getClusterInfo: Error, invalid clusterFile <$clusterFile>\n"; | |
} | |
my ($out, $rc) = &cmd("cat $clusterFile 2>/dev/null"); | |
if ($rc) { | |
die "getClusterInfo: Error, could not read clusterFile <$clusterFile>: $!\n"; | |
} | |
my $cluster = eval { | |
return decode_json($out); | |
}; | |
if ($@) { | |
die "getClusterInfo: Error parsing clusterFile <$clusterFile>: $@\n"; | |
} | |
# Confirm nodes are valid. | |
my @cluster = @$cluster; | |
my $i = 0; | |
for my $node (@cluster) { | |
if (not &_isClusterNodeValid($node)) { | |
die "getClusterInfo: Error, node $i is invalid (0-indexed)\n"; | |
} | |
$i++; | |
} | |
# Augment with a "logicalCores" field if none provided | |
for my $node (@cluster) { | |
if (not defined $node->{logicalCores}) { | |
my $out = &remoteCommand("accessCreds"=>$node, "command"=>"nproc"); | |
if ($out =~ m/^(\d+)$/) { | |
$node->{logicalCores} = int($out); | |
} | |
else { | |
die "getClusterInfo: Error, could not get logical cores for node $node->{host}:\n$out\n"; | |
} | |
} | |
&log("$node->{host} has $node->{logicalCores} logical cores"); | |
} | |
return @$cluster; | |
} | |
# input: ($clusterNode) | |
# output: ($isValid) | |
sub _isClusterNodeValid { | |
my ($node) = @_; | |
my @keys = ("host", "port", "user"); | |
if (not $node) { | |
return 0; | |
} | |
for my $key (@keys) { | |
if (not $node->{$key}) { | |
return 0; | |
} | |
} | |
return 1; | |
} | |
### | |
# Usage message, arg parsing. | |
### | |
sub getTerseUsage { | |
my $terseUsage = "Usage: $0 --cluster C.json --workScript W --taskFile F | |
[--resultFile R] [--workers w1,...] [--notWorkers w1,...] | |
[--unusedCores N] | |
[--copy src:dest] [--PATHprefix dir1:...] | |
[--verbose] [--help] | |
"; | |
return $terseUsage; | |
} | |
sub shortUsage { | |
print &getTerseUsage(); | |
exit 0; | |
} | |
sub longUsage { | |
my $terseUsage = &getTerseUsage(); | |
print "Description: Distribute tasks across workers | |
$terseUsage | |
--cluster C.json JSON-formatted cluster of workers | |
Should be an array of \"node\" objects with minimal keys: host port user [logicalCores] [unusedCores] | |
host, port, user: suitable for passwordless ssh | |
[logicalCores]: skip query of node for # logical cores | |
[unusedCores]: override global --unusedCores | |
--workScript W Script to execute against each task | |
**Must exist on every worker** | |
Argument is a filename, its stdout is saved in resultFile | |
--taskFile F One task per line, JSON-encoded. | |
[--resultFile R] One result per line, NOT guaranteed in the same order | |
JSON-encoded. | |
[--workers w1,... | --notWorkers w1,... ] Workers to use | workers not to use | |
[--unusedCores N] Cores to leave available on a worker | |
[--copy src:dest] Copy src to dest on every worker before running workScript | |
Must be that src != dest. Can be a file or a dir. | |
[--PATHprefix dir1:...] Prefix PATH with this string when executing workScript | |
[--verbose] | |
[--help] | |
"; | |
} | |
# Process args after GetOptions, ensure validity, etc. | |
# | |
# input: (%args) from GetOptions | |
# output: (%globals) with keys: | |
# cluster listref of hashrefs representing nodes to use in the worker cluster | |
# workScript script to execute | |
# taskFile one task per line | |
# resultFile one result per line | |
# unusedCores cores to leave idle on each worker | |
# [copy_src dir on manager] | |
# [copy_dest dir on worker] | |
# [PATHprefix prefix for PATH when invoking workScript] | |
# verbose extra loud | |
sub processArgs { | |
my %args = @_; | |
my $invalidArgs = 0; | |
# Bail out on no args or help. | |
if (not scalar(keys %args)) { | |
&shortUsage(); | |
exit 0; | |
} | |
if ($args{help}) { | |
&longUsage(); | |
exit 0; | |
} | |
# cluster | |
my @cluster; | |
if ($args{cluster} and -f $args{cluster} and $args{cluster} =~ m/\.js(on)?$/i) { | |
&log("Cluster file $args{cluster}"); | |
@cluster = &getClusterInfo($args{cluster}); | |
# workers | |
if ($args{workers}) { | |
$args{workers} = [split(",", join(",", @{$args{workers}}))]; # --w x,y --w z becomes x,y,z | |
} | |
else { | |
$args{workers} = [map { $_->{host} } @cluster]; # Default to include all | |
} | |
# Filter in workers | |
&log("Filtering in workers <@{$args{workers}}>"); | |
@cluster = grep { &listContains($args{workers}, $_->{host}) } @cluster; | |
# my @filteredCluster; | |
# for my $maybeNode (@cluster) { | |
# if (&listContains($args{workers}, $maybeNode->{host})) { | |
# push @filteredCluster, $maybeNode; | |
# } | |
# } | |
# @cluster = @filteredCluster; | |
# notWorkers | |
if ($args{notWorkers}) { | |
$args{notWorkers} = [split(",", join(",", @{$args{notWorkers}}))]; # --w x,y --w z becomes x,y,z | |
} | |
else { | |
$args{notWorkers} = []; # Default to exclude none | |
} | |
# Filter out notWorkers | |
&log("Filtering out workers <@{$args{notWorkers}}>"); | |
@cluster = grep { not &listContains($args{notWorkers}, $_->{host}) } @cluster; | |
# @filteredCluster = (); | |
# for my $maybeNode (@cluster) { | |
# if (not &listContains($args{notWorkers}, $maybeNode->{host})) { | |
# push @filteredCluster, $maybeNode; | |
# } | |
# } | |
# @cluster = @filteredCluster; | |
my @survivingNames = map { $_->{host} } @cluster; | |
if (@survivingNames) { | |
&log("Using " . scalar(@cluster) . " workers <@survivingNames>"); | |
} | |
else { | |
&log("Error, no workers"); | |
$invalidArgs = 1; | |
} | |
} | |
else { | |
&log("Error, invalid cluster file"); | |
$invalidArgs = 1; | |
} | |
# workScript | |
if ($args{workScript}) { | |
&log("Using workScript $args{workScript}"); | |
# Can't use -f. | |
# workScript might not exist *anywhere* yet before we honor --copy. | |
# And this node might not be a worker so -f would still fail. | |
} | |
# taskFile | |
if ($args{taskFile} and -f $args{taskFile}) { | |
&log("Using taskFile $args{taskFile}"); | |
} | |
else { | |
&log("Error, invalid taskFile"); | |
$invalidArgs = 1; | |
} | |
# resultFile | |
if (not $args{resultFile}) { | |
$args{resultFile} = "/tmp/parallel-work-pid$$-results"; | |
} | |
unlink $args{resultFile}; | |
&log("Using resultFile $args{resultFile}"); | |
# unusedCores | |
if (not defined $args{unusedCores}) { | |
$args{unusedCores} = 0; | |
} | |
if (0 <= $args{unusedCores}) { | |
&log("Using unusedCores $args{unusedCores}"); | |
} | |
else { | |
&log("Error, invalid unusedCores $args{unusedCores}"); | |
$invalidArgs = 1; | |
} | |
# copy | |
my ($copySrc, $copyDest); | |
if (defined $args{copy}) { | |
my @spl = split(":", $args{copy}); | |
if (scalar(@spl) == 2) { | |
my ($src, $dest) = @spl; | |
if ($src ne $dest and -e $src) { | |
&log("Using copySrc $src copyDest $dest"); | |
$copySrc = $src; | |
$copyDest = $dest; | |
} | |
else { | |
&log("Error, invalid copy src $src dest $dest. Must have src != dest, and src must exist"); | |
$invalidArgs = 1; | |
} | |
} | |
else { | |
&log("Error, malformed copy. Must be src:dest"); | |
$invalidArgs = 1; | |
} | |
} | |
# PATHprefix | |
if (not defined $args{PATHprefix}) { | |
$args{PATHprefix} = ""; | |
} | |
# verbose | |
if (not defined $args{verbose}) { | |
$args{verbose} = 0; | |
} | |
# Error out on invalid args. | |
if ($invalidArgs) { | |
&shortUsage(); | |
exit 1; | |
} | |
my %globals = ("cluster" => \@cluster, | |
"workScript" => $args{workScript}, | |
"taskFile" => $args{taskFile}, | |
"resultFile" => $args{resultFile}, | |
"unusedCores" => $args{unusedCores}, | |
"copy_src" => $copySrc, | |
"copy_dest" => $copyDest, | |
"PATHprefix" => $args{PATHprefix}, | |
"verbose" => $args{verbose}, | |
); | |
return %globals; | |
} | |
### | |
# Task management. | |
### | |
# input: (%args) keys: taskFile | |
# output: (@tasks) | |
# Each elt contains a task from &task_create, | |
# where the 'task' is a json-decoded version of one of the lines from the taskFile. | |
sub loadTasks { | |
my %args = @_; | |
&assertUsage("Usage: taskFile", $args{taskFile}); | |
lock($TASK_LOCK); | |
return if ($tasksLoaded); | |
$tasksLoaded = 1; | |
my @taskLines = `cat $args{taskFile}`; | |
chomp @taskLines; | |
my $_id = 1; # 1-indexed for sanity | |
my @tasks; | |
for my $line (@taskLines) { | |
&log("Task line: <$line>"); | |
my $task = decode_json($line); | |
my $id = $_id; | |
push @tasks, &task_create("id"=>$id, "task"=>$task); | |
$_id++; | |
} | |
return @tasks; | |
} | |
# Get the next task from @TASKS. | |
# | |
# input: () | |
# output: ($task) or $NO_TASKS_LEFT | |
# $task as created by &task_create. | |
sub getNextTask { | |
lock($TASK_LOCK); | |
&assert(($tasksLoaded), "getNextTask: Tasks never loaded"); | |
if (not @{$TASKS}) { | |
return $NO_TASKS_LEFT; | |
} | |
my $nextTask = shift @{$TASKS}; | |
&log("getNextTask: got <" . &task_toString($nextTask) . ">, " . scalar(@${TASKS}) . " tasks remaining"); | |
return $nextTask; | |
} | |
# input: (%args) keys: id task | |
# output: ($task) a ref with keys: id task | |
sub task_create { | |
my (%args) = @_; | |
&assertUsage("createTask: usage: id task", $args{id}, $args{task}); | |
return { "id" => $args{id}, | |
"task" => $args{task}, | |
}; | |
} | |
sub task_toString { | |
my ($task) = @_; | |
return "$task->{id}: " . encode_json($task->{task}); | |
} | |
### | |
# Result management. | |
### | |
# input: ($resultFile) | |
# output: ($FH) | |
sub openResultFile { | |
my ($resultFile) = @_; | |
lock($RESULT_LOCK); | |
if (not $resultFHOpened) { | |
&log("emitResult: Opening $resultFile for results"); | |
open($RESULT_FH, ">", $resultFile) or confess "Error, could not open resultFile: $!\n"; | |
$resultFHOpened = 1; | |
} | |
return $RESULT_FH; | |
} | |
# input: ($result) result object with keys: task workerInfo output | |
# task: from &getNextTask | |
# result: command-line output | |
# output: () | |
sub emitResult { | |
my ($result) = @_; | |
lock($RESULT_LOCK); | |
if (not $resultFHOpened) { | |
confess "Error, must call openResultFile first\n"; | |
} | |
my $encodedResult = encode_json($result); | |
&log("emitResult: Emitting: <$encodedResult>"); | |
print $RESULT_FH "$encodedResult\n"; | |
return; | |
} | |
sub noMoreResults { | |
if ($RESULT_FH) { | |
close($RESULT_FH) or &log("Error, closing result_fh failed: $!"); | |
} | |
} | |
### | |
# Handle copy | |
### | |
# input: (%args) keys: src dest destAccessCreds | |
# destAccessCreds: array ref of hashrefs of destAccessCreds for use with remoteCopy | |
# output: () | |
sub propagateCopyDir { | |
my %args = @_; | |
my @helpers; | |
# Parallel propagation. | |
for my $cred (@{$args{destAccessCreds}}) { | |
my %args = ("src"=>$globals{copy_src}, "dest"=>$globals{copy_dest}, "destAccessCreds"=>$cred); | |
my $thr = threads->create(sub { | |
my %args = @_; | |
my $rc = &remoteCopy("src"=>$args{src}, "dest"=>$args{dest}, "destAccessCreds"=>$cred); | |
return $rc; | |
}, %args); | |
push @helpers, $thr; | |
} | |
# Confirm that all propagation succeeded. | |
my @results; | |
for my $helper (@helpers) { | |
push @results, $helper->join(); | |
} | |
if (grep { $_ ne 0 } @results) { | |
confess "Error, at least one copy failure: <@results>\n"; | |
} | |
return; | |
} | |
### | |
# Utility | |
### | |
# input: (\@list, $e) | |
# output: true if $e is in @list, else false | |
sub listContains { | |
my ($list, $e) = @_; | |
for my $elt (@$list) { | |
if ($elt eq $e) { | |
return 1; | |
} | |
} | |
return 0; | |
} | |
sub min { | |
my (@nums) = @_; | |
my $min = $nums[0]; | |
for my $n (@nums) { | |
if ($n < $min) { | |
$min = $n; | |
} | |
} | |
return $min; | |
} | |
sub max { | |
my (@nums) = @_; | |
my $max = $nums[0]; | |
for my $n (@nums) { | |
if ($max < $n) { | |
$max = $n; | |
} | |
} | |
return $max; | |
} | |
sub assert { | |
my ($cond, $msg) = @_; | |
if (not $cond) { | |
print "ERROR: $msg\n"; | |
exit 1; | |
} | |
} | |
# input: ($msg, @varsThatShouldBeDefined) | |
# output: () | |
sub assertUsage { | |
my ($msg, @shouldBeDefined) = @_; | |
my @undefined = grep { not defined $_ } @shouldBeDefined; | |
&assert((not @undefined), $msg); | |
} | |
# input: ($cmd) | |
# output: ($out, $rc) | |
sub cmd { | |
my ($cmd) = @_; | |
&log($cmd); | |
my $out = `$cmd 2>&1`; | |
return ($out, $? >> 8); | |
} | |
sub log { | |
my ($msg) = @_; | |
my $now = localtime; | |
lock($LOG_LOCK); | |
print STDERR "$now: $msg\n"; | |
} | |
# input: (%args) keys: accessCreds command | |
# accessCreds: hashref, keys: port user host | |
# hint: a worker object can be used as accessCreds | |
# command: string to execute over ssh | |
# output: ($out) | |
sub remoteCommand { | |
my %args = @_; | |
&assertUsage("remoteCommand: Error, usage: hash with keys: accessCreds command", $args{accessCreds}, $args{command}); | |
my $cmd = "ssh -p$args{accessCreds}->{port} $args{accessCreds}->{user}\@$args{accessCreds}->{host} '$args{command}'"; | |
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it. | |
return ($out); | |
} | |
# Copy local file to remote host | |
# Recursively copy dirs | |
# | |
# input: (%args) keys: src dest destAccessCreds | |
# src/dest: files or dirs | |
# destAccessCreds: hashref, keys: port user host | |
# output: ($rc) 0 success, non-zero failure | |
sub remoteCopy { | |
my %args = @_; | |
my $cmd = "scp -r -P$args{destAccessCreds}->{port} $args{src} $args{destAccessCreds}->{user}\@$args{destAccessCreds}->{host}:$args{dest}"; | |
my ($out, $rc) = &cmd($cmd); # rc is from ssh, not from command, so don't bother returning it. | |
return ($rc); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I've made a few extensions, haven't figured out how to push yet.