Last active
July 31, 2020 03:52
-
-
Save s1037989/0e8e77d55c14fa739f19d8cfdbbc69c1 to your computer and use it in GitHub Desktop.
Bash Job Queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env bash | |
| # Original: https://hackthology.com/a-job-queue-in-bash.html | |
| # This is a modification that adds receiving "work" from a named pipeline | |
| # Run the job queue and then just pass "work" to it whenever: | |
| # $ q H1 ls -l /tmp | |
| # $ q H2 date | |
| # $ q H1 sleep 4 | |
| # $ q H3 echo hi | |
| usage() { | |
| echo "Usage: $0 [-h] [-l logfile] [-t tmpdir] [-w workers]" 1>&2 | |
| exit 1 | |
| } | |
| while getopts ":hl:t:w:" o; do | |
| case "$o" in | |
| h) usage;; | |
| l) LOG=$OPTARG;; | |
| t) TMPDIR=$OPTARG;; | |
| w) WORKERS=$OPTARG;; | |
| *) true;; | |
| esac | |
| done | |
| shift $((OPTIND-1)) | |
| LOG=${LOG:-/var/log/job_queue.log} | |
| TMPDIR=${TMPDIR:-/tmp} | |
| WORKERS=${WORKERS:-1} | |
| # make the files | |
| START=$(mktemp -p $TMPDIR -t start-XXXX) | |
| FIFO=$(mktemp -p $TMPDIR -t fifo-XXXX) | |
| FIFO_LOCK=$(mktemp -p $TMPDIR -t lock-XXXX) | |
| START_LOCK=$(mktemp -p $TMPDIR -t lock-XXXX) | |
| JOBS=$(mktemp -p $TMPDIR -t jobs-XXXX) | |
| LOG_LOCK=$(mktemp -u -p $TMPDIR -t lock-XXXX) | |
| ## this is the "job" function which is does whatever work | |
| ## the queue workers are supposed to be doing | |
| job() { | |
| i=$1; shift | |
| read cmd args <<< $1 | |
| server=${TMPDIR##*/} | |
| log "Running job on $server => id:$i\tcmd=$cmd\targs:$args" | |
| case "$server" in | |
| SH) | |
| case "$cmd" in | |
| *) log "Cmd Error: $cmd $args";; | |
| esac;; | |
| PT) | |
| case "$cmd" in | |
| mscu_encrypt_send) echo mscu encrypt send "$args";; | |
| mscu_decrypt_recv) echo mscu decrypt recv "$args";; | |
| mscu_ccs_send) echo mscu ccs send "$args";; | |
| *) log "Cmd Error: $cmd $args";; | |
| esac;; | |
| C) | |
| case "$cmd" in | |
| *) log "Cmd Error: $cmd $args";; | |
| esac;; | |
| U) | |
| case "$cmd" in | |
| mscu_encrypt_recv) echo mscu encrypt recv "$args";; | |
| mscu_decrypt_send) echo mscu decrypt send "$args";; | |
| mscu_ccs_recv) echo mscu ccs recv "$args";; | |
| *) log "Cmd Error: $cmd $args";; | |
| esac;; | |
| *) log "Unknown Server Error: $server";; | |
| esac | |
| log "Ending job on $server => id:$i\tcmd=$cmd\targs:$args" | |
| } | |
| log() { | |
| ( | |
| flock -n 9 || exit 1 | |
| printf "%s %b\n" "$(date +"%Y-%m-%d %H:%M:%S.%N")" "$*" | tee -a $LOG | |
| ) 9>$LOG_LOCK && rm $LOG_LOCK | |
| } | |
| new_job() { [ -n "$1" ] && mktemp -u -p ${TMPDIR%/*}/$1 -t job-XXXX; } | |
| q() { local tmp=$1; shift; echo "$*" > $(new_job $tmp); } | |
| ## mktemp makes a regular file. Delete that an make a fifo. | |
| rm $FIFO | |
| mkfifo $FIFO | |
| echo $FIFO | |
| rm $JOBS | |
| mkfifo $JOBS | |
| echo $JOBS | |
| ## create a trap to cleanup on exit if we fail in the middle. | |
| cleanup() { | |
| rm $FIFO | |
| rm $START | |
| rm $FIFO_LOCK | |
| rm $START_LOCK | |
| rm $JOBS | |
| } | |
| trap cleanup 0 | |
| ## This is the worker to read from the queue. | |
| work() { | |
| ID=$1 | |
| ## first open the fifo and locks for reading. | |
| exec 3<$FIFO | |
| exec 4<$FIFO_LOCK | |
| exec 5<$START_LOCK | |
| ## signal the worker has started. | |
| flock 5 # obtain the start lock | |
| echo $ID >> $START # put my worker ID in the start file | |
| flock -u 5 # release the start lock | |
| exec 5<&- # close the start lock file | |
| echo worker $ID started | |
| while true; do | |
| ## try to read the queue | |
| flock 4 # obtain the fifo lock | |
| read -su 3 work_id work_item # read into work_id and work_item | |
| read_status=$? # save the exit status of read | |
| flock -u 4 # release the fifo lock | |
| ## check the line read. | |
| if [[ $read_status -eq 0 ]]; then | |
| ## If read gives an exit code of 0 the read succeeded. | |
| # got a work item. do the work | |
| log "worker $ID got work_id=$work_id work_item=$work_item" | |
| ## Run the job in a subshell. That way any exit calls do not kill | |
| ## the worker process. | |
| ( job "$work_id" "$work_item" ) | |
| else | |
| ## Any other exit code indicates an EOF. | |
| break | |
| fi | |
| done | |
| # clean up the fd(s) | |
| exec 3<&- | |
| exec 4<&- | |
| log "$ID: done working" | |
| } | |
| ## Start the workers. | |
| for ((i=1;i<=$WORKERS;i++)); do | |
| log "will start worker $i" | |
| work $i & | |
| done | |
| ## Open the fifo for writing. | |
| exec 3>$FIFO | |
| ## Open the start lock for reading | |
| exec 4<$START_LOCK | |
| ## Wait for the workers to start | |
| while true; do | |
| flock 4 | |
| started=$(wc -l $START | cut -d \ -f 1) | |
| flock -u 4 | |
| if [[ $started -eq $WORKERS ]]; then | |
| break | |
| else | |
| log "waiting, started $started of $WORKERS" | |
| fi | |
| done | |
| exec 4<&- | |
| ## utility function to send the jobs to the workers | |
| send() { | |
| work_id=$1;shift | |
| work_item="$@" | |
| log "sending $work_id $work_item" | |
| echo "$work_id" "$work_item" 1>&3 ## the fifo is fd 3 | |
| } | |
| ## Produce the jobs to run. | |
| i=0 | |
| inotifywait -m -e create --format '%w%f' $TMPDIR > $JOBS & | |
| iwait_pid=$! | |
| while read cmdfile; do | |
| [[ "$cmdfile" =~ job- ]] || continue | |
| log "Reading command from $cmdfile" | |
| cmd=$(<$cmdfile) | |
| rm -f $cmdfile | |
| [ "$cmd" == "exit" ] && { kill $iwait_pid; break; } | |
| send $i "$cmd" | |
| i=$((i+1)) | |
| done < $JOBS | |
| ## close the filo | |
| exec 3<&- | |
| ## disable the cleanup trap | |
| trap '' 0 | |
| ## It is safe to delete the files because the workers | |
| ## already opened them. Thus, only the names are going away | |
| ## the actual files will stay there until the workers | |
| ## all finish. | |
| cleanup | |
| ## now wait for all the workers. | |
| wait |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment