Skip to content

Instantly share code, notes, and snippets.

@benjaminballard
Last active August 29, 2015 14:15
Show Gist options
  • Save benjaminballard/a7d8aeba9189250ad051 to your computer and use it in GitHub Desktop.
Save benjaminballard/a7d8aeba9189250ad051 to your computer and use it in GitHub Desktop.
#!/usr/bin/env bash
# directories where files land, are staged for loading, and are archived when done
LZ="/tmp/lz"
LOADING="/tmp/loading"
DONE="/tmp/done"
# extension of files to look for
EXT="gz"
# JOBLIMIT = number of csvloader jobs at a time
JOBLIMIT=5
# SLEEP = seconds to sleep when no files found
SLEEP=10
# LASTMOD = minutes since last_modified time before files can be loaded
LASTMOD=1
# create directories if they don't already exist
mkdir -p $LZ $LOADING $DONE
# This function watches the LZ directory in an endless loop
# when files arrive and have been unmodified for > 1 minute (configurable with LAST_MOD)
# the files are loaded, 3 (configurable with JOBLIMIT) at a time
function watch() {
while true
do
# find files in the LZ folder with the right extension that have not been modified for 1 minute
FILES=$(find $LZ/*.$EXT -type f -mmin +1 2>/dev/null)
for f in $FILES; do
stage_and_wait $f
done
if [ -z "$FILES" ]; then
#echo "no files unmodified for > $LASTMOD minute, sleep $SLEEP seconds..."
sleep $SLEEP
fi
done
}
# This function is called for a file that has been found by watch.
# It first moves the file to a unique directory under LOADING and then waits until
# there are < 3 (configurable with JOBLIMIT) jobs running before it proceeds
function stage_and_wait() {
# stage files for loading
FILEPATH=$1
FILENAME=$(basename $FILEPATH)
NO_EXT=${FILENAME%%.*}
LOAD_DIR=$LOADING/$NO_EXT
mkdir -p $LOAD_DIR
mv $FILEPATH $LOAD_DIR
# wait for turn
while true; do
jobcount=$(jobs -pr | wc -l)
if [[ $jobcount -lt $JOBLIMIT ]]; then
break
fi
sleep 1
done
# load files
echo "$FILENAME: loading"
load $LOAD_DIR $FILENAME &
}
# This function is called when the a file is ready to be loaded into VoltDB
# it gzips the file and loads it with csvloader, and checks the results before archiving the files
function load() {
LOAD_DIR=$1
FILENAME=$2
gunzip -c $LOAD_DIR/$FILENAME | csvloader -r $LOAD_DIR votes &> /dev/null
# check for any errors
rows_inserted=`cat $LOAD_DIR/*_report.log | egrep "Number of rows successfully inserted" | sed -e 's#.*: \(\)#\1#'`
rows_not_inserted=`cat $LOAD_DIR/*_report.log | egrep "Number of rows that could not be inserted" | sed -e 's#.*: \(\)#\1#'`
echo "$FILENAME: loaded $rows_inserted rows ($rows_not_inserted errors)"
# copy entire LOAD_DIR containing csv file and csvloader logs to DONE folder
DATE=`date +%Y%m%d`
mkdir -p $DONE/$DATE
mv $LOAD_DIR $DONE/$DATE
}
# call the watch function
watch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment