Last active
August 29, 2015 14:15
-
-
Save benjaminballard/a7d8aeba9189250ad051 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# directories where files land, are staged for loading, and are archived when done | |
LZ="/tmp/lz" | |
LOADING="/tmp/loading" | |
DONE="/tmp/done" | |
# extension of files to look for | |
EXT="gz" | |
# JOBLIMIT = number of csvloader jobs at a time | |
JOBLIMIT=5 | |
# SLEEP = seconds to sleep when no files found | |
SLEEP=10 | |
# LASTMOD = minutes since last_modified time before files can be loaded | |
LASTMOD=1 | |
# create directories if they don't already exist | |
mkdir -p $LZ $LOADING $DONE | |
# This function watches the LZ directory in an endless loop | |
# when files arrive and have been unmodified for > 1 minute (configurable with LAST_MOD) | |
# the files are loaded, 3 (configurable with JOBLIMIT) at a time | |
function watch() { | |
while true | |
do | |
# find files in the LZ folder with the right extension that have not been modified for 1 minute | |
FILES=$(find $LZ/*.$EXT -type f -mmin +1 2>/dev/null) | |
for f in $FILES; do | |
stage_and_wait $f | |
done | |
if [ -z "$FILES" ]; then | |
#echo "no files unmodified for > $LASTMOD minute, sleep $SLEEP seconds..." | |
sleep $SLEEP | |
fi | |
done | |
} | |
# This function is called for a file that has been found by watch. | |
# It first moves the file to a unique directory under LOADING and then waits until | |
# there are < 3 (configurable with JOBLIMIT) jobs running before it proceeds | |
function stage_and_wait() { | |
# stage files for loading | |
FILEPATH=$1 | |
FILENAME=$(basename $FILEPATH) | |
NO_EXT=${FILENAME%%.*} | |
LOAD_DIR=$LOADING/$NO_EXT | |
mkdir -p $LOAD_DIR | |
mv $FILEPATH $LOAD_DIR | |
# wait for turn | |
while true; do | |
jobcount=$(jobs -pr | wc -l) | |
if [[ $jobcount -lt $JOBLIMIT ]]; then | |
break | |
fi | |
sleep 1 | |
done | |
# load files | |
echo "$FILENAME: loading" | |
load $LOAD_DIR $FILENAME & | |
} | |
# This function is called when the a file is ready to be loaded into VoltDB | |
# it gzips the file and loads it with csvloader, and checks the results before archiving the files | |
function load() { | |
LOAD_DIR=$1 | |
FILENAME=$2 | |
gunzip -c $LOAD_DIR/$FILENAME | csvloader -r $LOAD_DIR votes &> /dev/null | |
# check for any errors | |
rows_inserted=`cat $LOAD_DIR/*_report.log | egrep "Number of rows successfully inserted" | sed -e 's#.*: \(\)#\1#'` | |
rows_not_inserted=`cat $LOAD_DIR/*_report.log | egrep "Number of rows that could not be inserted" | sed -e 's#.*: \(\)#\1#'` | |
echo "$FILENAME: loaded $rows_inserted rows ($rows_not_inserted errors)" | |
# copy entire LOAD_DIR containing csv file and csvloader logs to DONE folder | |
DATE=`date +%Y%m%d` | |
mkdir -p $DONE/$DATE | |
mv $LOAD_DIR $DONE/$DATE | |
} | |
# call the watch function | |
watch |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment