Skip to content

Instantly share code, notes, and snippets.

@trink
Created February 21, 2018 20:35
Show Gist options
  • Save trink/399e8b923bcbc7095afba1ba0870d10a to your computer and use it in GitHub Desktop.
Save trink/399e8b923bcbc7095afba1ba0870d10a to your computer and use it in GitHub Desktop.
S3 Uploader for Hindsight Output
#!/bin/bash
export s3_bucket="$1"
export concurrent_jobs="$2"
export use_compression="$3"
export s3_buffer_dirs="${@:4}"
export STORAGE_CLASS="STANDARD"
function upload_file () {
cmd="$1"
set -x
if [[ "$file" =~ parquet || "$use_compression" != "true" ]] ; then
$cmd
else
pigz -c ${file} | $cmd
fi
set +x
}
export -f upload_file
function process_file () {
file=$1
if ! [[ -f $file ]] ; then
echo "$file does not exist"
return 0
fi
base=$(basename $file .done)
s3_bucket_prefix=$(dirname $file)
for dir in $s3_buffer_dirs ; do
s3_bucket_prefix=${s3_bucket_prefix#${dir}/}
done
s3_object_base="$(echo $base |sed -e 's|\+|/|g')"
s3_object_name="s3://${s3_bucket}/${s3_bucket_prefix}/${s3_object_base}"
# this logic is somewhat convoluted to avoid using eval
input="-"
output="${s3_object_name}.gz"
encoding="--content-encoding gzip"
if [[ "$file" =~ parquet || "$use_compression" != "true" ]] ; then
input="${file}"
output="${s3_object_name}"
encoding=""
fi
cmd="aws s3 cp ${encoding} --quiet --storage-class ${STORAGE_CLASS} ${input} ${output}"
TIMEOUT=1
until upload_file "$cmd" ; do
TIMEOUT=$(( $TIMEOUT * 2 ))
echo "${file} failed, retrying in ${TIMEOUT}s"
sleep $TIMEOUT
done
rm "$file"
}
export -f process_file
function process_files () {
xargs -P $concurrent_jobs -n 1 -I {} bash -c 'process_file "$@"' _ {}
}
export -f process_files
function finalize () {
find ${s3_buffer_dirs} -name '*.done' | process_files
}
trap finalize EXIT INT TERM
inotifywait -q -e moved_to -r -m $s3_buffer_dirs | stdbuf -oL -eL grep '\.done$' | stdbuf -oL -eL cut -d' ' -f1,3 --output-delimiter '' | stdbuf -oL -eL tr -d '\0' | process_files
[Unit]
Description=s3-upload
After=syslog.target network.target
Before=<%= @before_service %>
[Service]
Type=simple
ExecStart=/usr/local/bin/s3_upload <%= @s3_bucket %> <%= @s3_concurrent_jobs %> <%= @use_compression %> <%= @s3_buffer_dirs.join(" ") %>
TimeoutStopSec=900
Restart=always
StartLimitInterval=0
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment