Skip to content

Instantly share code, notes, and snippets.

@s1037989
Last active July 31, 2020 03:52
Show Gist options
  • Select an option

  • Save s1037989/0e8e77d55c14fa739f19d8cfdbbc69c1 to your computer and use it in GitHub Desktop.

Select an option

Save s1037989/0e8e77d55c14fa739f19d8cfdbbc69c1 to your computer and use it in GitHub Desktop.
Bash Job Queue
#!/usr/bin/env bash
# Original: https://hackthology.com/a-job-queue-in-bash.html
# This is a modification that adds receiving "work" from a named pipeline
# Run the job queue and then just pass "work" to it whenever:
# $ q H1 ls -l /tmp
# $ q H2 date
# $ q H1 sleep 4
# $ q H3 echo hi
usage() {
echo "Usage: $0 [-h] [-l logfile] [-t tmpdir] [-w workers]" 1>&2
exit 1
}
while getopts ":hl:t:w:" o; do
case "$o" in
h) usage;;
l) LOG=$OPTARG;;
t) TMPDIR=$OPTARG;;
w) WORKERS=$OPTARG;;
*) true;;
esac
done
shift $((OPTIND-1))
LOG=${LOG:-/var/log/job_queue.log}
TMPDIR=${TMPDIR:-/tmp}
WORKERS=${WORKERS:-1}
# make the files
START=$(mktemp -p $TMPDIR -t start-XXXX)
FIFO=$(mktemp -p $TMPDIR -t fifo-XXXX)
FIFO_LOCK=$(mktemp -p $TMPDIR -t lock-XXXX)
START_LOCK=$(mktemp -p $TMPDIR -t lock-XXXX)
JOBS=$(mktemp -p $TMPDIR -t jobs-XXXX)
LOG_LOCK=$(mktemp -u -p $TMPDIR -t lock-XXXX)
## this is the "job" function which is does whatever work
## the queue workers are supposed to be doing
job() {
i=$1; shift
read cmd args <<< $1
server=${TMPDIR##*/}
log "Running job on $server => id:$i\tcmd=$cmd\targs:$args"
case "$server" in
SH)
case "$cmd" in
*) log "Cmd Error: $cmd $args";;
esac;;
PT)
case "$cmd" in
mscu_encrypt_send) echo mscu encrypt send "$args";;
mscu_decrypt_recv) echo mscu decrypt recv "$args";;
mscu_ccs_send) echo mscu ccs send "$args";;
*) log "Cmd Error: $cmd $args";;
esac;;
C)
case "$cmd" in
*) log "Cmd Error: $cmd $args";;
esac;;
U)
case "$cmd" in
mscu_encrypt_recv) echo mscu encrypt recv "$args";;
mscu_decrypt_send) echo mscu decrypt send "$args";;
mscu_ccs_recv) echo mscu ccs recv "$args";;
*) log "Cmd Error: $cmd $args";;
esac;;
*) log "Unknown Server Error: $server";;
esac
log "Ending job on $server => id:$i\tcmd=$cmd\targs:$args"
}
log() {
(
flock -n 9 || exit 1
printf "%s %b\n" "$(date +"%Y-%m-%d %H:%M:%S.%N")" "$*" | tee -a $LOG
) 9>$LOG_LOCK && rm $LOG_LOCK
}
new_job() { [ -n "$1" ] && mktemp -u -p ${TMPDIR%/*}/$1 -t job-XXXX; }
q() { local tmp=$1; shift; echo "$*" > $(new_job $tmp); }
## mktemp makes a regular file. Delete that an make a fifo.
rm $FIFO
mkfifo $FIFO
echo $FIFO
rm $JOBS
mkfifo $JOBS
echo $JOBS
## create a trap to cleanup on exit if we fail in the middle.
cleanup() {
rm $FIFO
rm $START
rm $FIFO_LOCK
rm $START_LOCK
rm $JOBS
}
trap cleanup 0
## This is the worker to read from the queue.
work() {
ID=$1
## first open the fifo and locks for reading.
exec 3<$FIFO
exec 4<$FIFO_LOCK
exec 5<$START_LOCK
## signal the worker has started.
flock 5 # obtain the start lock
echo $ID >> $START # put my worker ID in the start file
flock -u 5 # release the start lock
exec 5<&- # close the start lock file
echo worker $ID started
while true; do
## try to read the queue
flock 4 # obtain the fifo lock
read -su 3 work_id work_item # read into work_id and work_item
read_status=$? # save the exit status of read
flock -u 4 # release the fifo lock
## check the line read.
if [[ $read_status -eq 0 ]]; then
## If read gives an exit code of 0 the read succeeded.
# got a work item. do the work
log "worker $ID got work_id=$work_id work_item=$work_item"
## Run the job in a subshell. That way any exit calls do not kill
## the worker process.
( job "$work_id" "$work_item" )
else
## Any other exit code indicates an EOF.
break
fi
done
# clean up the fd(s)
exec 3<&-
exec 4<&-
log "$ID: done working"
}
## Start the workers.
for ((i=1;i<=$WORKERS;i++)); do
log "will start worker $i"
work $i &
done
## Open the fifo for writing.
exec 3>$FIFO
## Open the start lock for reading
exec 4<$START_LOCK
## Wait for the workers to start
while true; do
flock 4
started=$(wc -l $START | cut -d \ -f 1)
flock -u 4
if [[ $started -eq $WORKERS ]]; then
break
else
log "waiting, started $started of $WORKERS"
fi
done
exec 4<&-
## utility function to send the jobs to the workers
send() {
work_id=$1;shift
work_item="$@"
log "sending $work_id $work_item"
echo "$work_id" "$work_item" 1>&3 ## the fifo is fd 3
}
## Produce the jobs to run.
i=0
inotifywait -m -e create --format '%w%f' $TMPDIR > $JOBS &
iwait_pid=$!
while read cmdfile; do
[[ "$cmdfile" =~ job- ]] || continue
log "Reading command from $cmdfile"
cmd=$(<$cmdfile)
rm -f $cmdfile
[ "$cmd" == "exit" ] && { kill $iwait_pid; break; }
send $i "$cmd"
i=$((i+1))
done < $JOBS
## close the filo
exec 3<&-
## disable the cleanup trap
trap '' 0
## It is safe to delete the files because the workers
## already opened them. Thus, only the names are going away
## the actual files will stay there until the workers
## all finish.
cleanup
## now wait for all the workers.
wait
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment