Last active
April 18, 2019 09:09
-
-
Save ottokruse/65748a3c1a23007c76f362f07f76e4c9 to your computer and use it in GitHub Desktop.
SQS worker written in Bash
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
log() { | |
echo $(date -u +"%Y-%m-%dT%H:%M:%SZ") $@ # >> output.log | |
} | |
convert_single () { | |
cd /mnt/ | |
FULL_PATH=$1 | |
time wine /home/ubuntu/bin/Dat2Cvw_1.4.7.1.A/DAT2CVW.exe -i $FULL_PATH >/dev/null 2>&1 | |
} | |
convert_multi () { | |
cd /mnt/ | |
FULL_PATH_FILE1=$1 | |
FULL_PATH_FILE2=$2 | |
time wine /home/ubuntu/bin/Dat2Cvw_1.4.7.1.A/DAT2CVW.exe -i $FULL_PATH_FILE1 -g $FULL_PATH_FILE2 >/dev/null 2>&1 | |
} | |
rsim_flr() { | |
cd /mnt/ | |
FULL_PATH=$1 | |
time wine /home/ubuntu/bin/FLR/R12_31_80D11_3_SIMS/SIMS_GEN12.exe -ECU 11 -OnlyNewCan -VehicleData AMP -CAN1 1 -CAN2 2 -RegrTest -DisableExport -KeepAmps -MeasFilePath $1 -OutCvw >/dev/null 2>&1 | |
} | |
rsim_fsrl() { | |
cd /mnt/ | |
FULL_PATH=$1 | |
time wine /home/ubuntu/bin/FSRx/R12_31_70D11_3_SIMS/SIMS_GEN12.exe -ECU 10 -OnlyNewCan -VehicleData AMP -CAN1 3 -RegrTest -DisableExport -KeepAmps -MeasFilePath $1 -OutCvw >/dev/null 2>&1 | |
} | |
rsim_fsrr() { | |
cd /mnt/ | |
FULL_PATH=$1 | |
time wine /home/ubuntu/bin/FSRx/R12_31_70D11_3_SIMS/SIMS_GEN12.exe -ECU 12 -OnlyNewCan -VehicleData AMP -CAN1 4 -RegrTest -DisableExport -KeepAmps -MeasFilePath $1 -OutCvw >/dev/null 2>&1 | |
} | |
process_message () { | |
MESSAGE="$1" | |
RECEIPT_HANDLE=$(echo $MESSAGE | jq -cr '.ReceiptHandle') | |
RECEIPT_HANDLES+=($RECEIPT_HANDLE) | |
BODY=$(echo $MESSAGE | jq -cr '.Body | fromjson') | |
FILETYPE=$(echo $BODY | jq -cr '.filetype') | |
FILENAME1=$(echo $BODY | jq -cr '.filename1') | |
FILENAME2=$(echo $BODY | jq -cr '.filename2') | |
PREFIX=$(echo $BODY | jq -cr '.prefix') | |
log "Processing ${FILETYPE}: filename1=${FILENAME1}, filename2=${FILENAME2} ..." | |
download_files $FILENAME1 $FILENAME2 $PREFIX || exit 1 | |
log "Creating .cvw for filename1=${FILENAME1}, filename2=${FILENAME2} ..." | |
if [[ $FILENAME2 == 'null' ]]; then | |
convert_single ${DOWNLOAD_DIR}/${FILENAME1} | |
else | |
convert_multi ${DOWNLOAD_DIR}/${FILENAME1} ${DOWNLOAD_DIR}/${FILENAME2} | |
fi | |
log "Done creating .cvw for filename1=${FILENAME1}, filename2=${FILENAME2}" | |
CVW_FILE="${DOWNLOAD_DIR}/${FILENAME1%.*}.cvw" | |
log "Doing resim of $CVW_FILE ..." | |
case "$FILETYPE" in | |
FLR) | |
rsim_flr $CVW_FILE | |
;; | |
FSRL) | |
rsim_fsrl $CVW_FILE | |
;; | |
FSRR) | |
rsim_fsrr $CVW_FILE | |
;; | |
esac | |
log "Done doing resim of $CVW_FILE" | |
upload_file ${DOWNLOAD_DIR} ${FILENAME1} || exit 1 | |
aws sqs delete-message --queue-url ${SQS_QUEUE_URL} --region ${AWS_REGION} --receipt-handle "${RECEIPT_HANDLE}" || exit 1 | |
log "Done processing ${FILETYPE}: filename1=${FILENAME1}, filename2=${FILENAME2}" | |
} | |
upload_file() { | |
DOWNLOAD_DIR="$1" | |
FILENAME1="$2" | |
cd $DOWNLOAD_DIR | |
OUTPUT_FILE=$(ls ${FILENAME1%.*}_*_Out.cvw) | |
log "Uploading s3://${OUTPUT_BUCKET_NAME}/${OUTPUT_FILE} ..." | |
aws s3 cp --only-show-errors "${OUTPUT_FILE}" "s3://${OUTPUT_BUCKET_NAME}/${OUTPUT_FILE}" || exit 1 | |
log "Uploaded s3://${OUTPUT_BUCKET_NAME}/${OUTPUT_FILE}" | |
} | |
download_files() { | |
FILENAME1="$1" | |
FILENAME2="$2" | |
PREFIX="$3" | |
mkdir -p ${DOWNLOAD_DIR}/${PREFIX} | |
for FILENAME in $FILENAME1 $FILENAME2 | |
do | |
if [[ $FILENAME == 'null' ]]; then | |
continue | |
fi | |
if [[ ! -f ${DOWNLOAD_DIR}/${FILENAME} ]]; then | |
log "Downloading s3://${INPUT_BUCKET_NAME}/${FILENAME} ..." | |
aws s3 cp --only-show-errors "s3://${INPUT_BUCKET_NAME}/${FILENAME}" "${DOWNLOAD_DIR}/${FILENAME}" || exit 1 | |
log "Downloaded s3://${INPUT_BUCKET_NAME}/${FILENAME}" | |
else | |
log "Already downloaded ${FILENAME}" | |
fi | |
done | |
} | |
ensure_disk_space() { | |
MAX_NUMBER_OF_MESSAGES="$1" | |
DISK_USAGE=$(df -Ph ${DOWNLOAD_DIR} | tail -1 | awk '{print $5}') | |
while [[ ${DISK_USAGE::-1} -gt 80 ]] | |
do | |
let "NR_OF_DELETES = ${MAX_NUMBER_OF_MESSAGES} * 4" | |
log "Disk usage is at ${DISK_USAGE}. Removing ${NR_OF_DELETES} downloaded files ..." | |
for FILE_TO_DELETE in $(find $DOWNLOAD_DIR -name "*.*" | sort | head -${NR_OF_DELETES}) | |
do | |
rm $FILE_TO_DELETE | |
done | |
log "Done removing files" | |
DISK_USAGE=$(df -Ph ${DOWNLOAD_DIR} | tail -1 | awk '{print $5}') | |
log "Disk usage is now at ${DISK_USAGE}" | |
done | |
} | |
set_sqs_visibility_zero() { | |
for RECEIPT_HANDLE in "${RECEIPT_HANDLES[@]}" | |
do | |
log "Putting message back on SQS queue ..." | |
aws sqs change-message-visibility --queue-url $SQS_QUEUE_URL --region $AWS_REGION --receipt-handle $RECEIPT_HANDLE --visibility-timeout 0 | |
log "Done putting message back on SQS queue ..." | |
done | |
} | |
sqs_worker() { | |
if [[ -z $MAX_NUMBER_OF_MESSAGES ]]; then | |
MAX_NUMBER_OF_MESSAGES=$(lscpu --extended | grep "yes" | wc -l) | |
fi | |
while true | |
do | |
log "Fetching (maximally) ${MAX_NUMBER_OF_MESSAGES} messages from SQS ..." | |
RECEIPT_HANDLES=() | |
SQS_MESSAGES=$(aws sqs receive-message --queue-url $SQS_QUEUE_URL --region $AWS_REGION --max-number-of-messages=$MAX_NUMBER_OF_MESSAGES --wait-time-seconds=20) | |
MESSAGES=$(echo "${SQS_MESSAGES}" | jq -c '.Messages[]') | |
if [[ -z $MESSAGES ]]; then | |
log "No (more) messages in SQS. All done, for now. Will sleep for 10 secs ..." | |
sleep 10 | |
fi | |
for MESSAGE in $MESSAGES | |
do | |
process_message $MESSAGE & | |
sleep 0.5 # Prevent duplicate downloads by giving each job a bit of time for itself | |
done | |
wait || set_sqs_visibility_zero | |
log "Making sure we have enough disk space left for another round ..." | |
ensure_disk_space ${MAX_NUMBER_OF_MESSAGES} | |
log "Disk space ensured" | |
log | |
log "=============" | |
log | |
if [[ ! -z $SINGLE_RUN ]]; then | |
break | |
fi | |
done | |
} | |
sqs_worker |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment