Skip to content

Instantly share code, notes, and snippets.

@bokwoon95
Last active January 2, 2020 14:41
Show Gist options
  • Save bokwoon95/4da4443f3cf360d62d8f40ffb1ac372f to your computer and use it in GitHub Desktop.
Save bokwoon95/4da4443f3cf360d62d8f40ffb1ac372f to your computer and use it in GitHub Desktop.
Run multiple instances of a python script on each file inside a directory concurrently, then merge the results
#!/bin/bash
shopt -s extglob
HELP=" Usage: multiprocessor.sh --pythonscript <script.py> --datadir <data_directory> [--resultdir <result_directory> --removeheaders --jobs <num_of_jobs>]
multiprocessor.sh coordinates multiple instances of the same python script to
run concurrently on files inside the data directory. This allows you to write
python scripts that are single-threaded in nature, and let this script handle
the concurrency.
The only requirement is that the python script print its result to standard
output, so that multiprocessor.sh can redirect the stdout into a temporary
csv file. Once all script instances have finished, the contents of the
temporary csv files will be merged into a single csv file inside the result
directory. The merged csv will have the same name as the python script, but
with a *.csv extension. If a result directory is not provided, the current
directory ($(pwd)) will be used.
Examples:
multiprocessor.sh --pythonscript ./scrape.py --datadir ./somedir
multiprocessor.sh --pythonscript ./scrape.py --datadir ./somedir --resultdir ./results
multiprocessor.sh --pythonscript ./scrape.py --datadir ./somedir --resultdir ./results --removeheaders
Options:
--pythonscript : Path to the python script to be run
--datadir : The directory containing the files to be passed to the python script as arguments
--resultdir : The directory to dump the merged csv into
--removeheaders : Remove headers from all files except the first
--jobs : The max number of concurrent jobs to run at a time (default 30)
"
# Unpack script arguments
argc="$#";: "$((i=0))"
while [ "$((i))" -lt "$argc" ]; do
case "$1" in
--help|-h) Help='true';;
--pythonscript) shift;: $((i=i+1)); Pythonscript="$1";;
--datadir) shift;: $((i=i+1)); Datadir="$1";;
--resultdir) shift;: $((i=i+1)); Resultdir="$1";;
--removeheaders) Removeheaders='true';;
--jobs) shift;: $((i=i+1)); Jobs="$1";;
*) :;;
esac
shift;: "$((i=i+1))"
done
if [ "$argc" -eq 0 ] || [ "$Help" ] ; then
echo "$HELP"
exit 0
fi
# Ensure python3 present
if ! command -v python3 >/dev/null 2>&1; then
echo 'python3 not found'
exit 1
fi
# Ensure $Pythonscript present
if [ ! -f "$Pythonscript" ]; then
echo "--pythonscript not found"
exit 1
fi
# Ensure $Datadir present
if [ ! "$Datadir" ]; then
echo "--datadir not provided"
exit 1
fi
# Ensure valid $Resultdir
if [ ! "$Resultdir" ]; then
Resultdir="$(pwd)"
fi
# Ensure valid $Jobs
if [ ! "$Jobs" ]; then
Jobs='30'
fi
Tempdir="/tmp/$(basename $Pythonscript .py)_$(date +%s)$(< /dev/urandom tr -dc A-Za-z0-9 | head -c10)"
invoke_script() {
if [ $# -gt 0 ]; then
Pythonscript="$1"
Tempdir="$2"
total="$3"
index="$4"
index_padded="$(printf "%0$(($(echo "$total" | wc -c) - 1))d" $index)"
filepath="$5"
filepath_slashes_to_dots="$(echo $filepath | sed 's#/#.#g' | sed 's#^\.*##g')"
echo "[START][$index_padded/$total] $filepath"
python3 "$Pythonscript" "$filepath" > "$Tempdir/$filepath_slashes_to_dots.csv"
echo "[ END][$index_padded/$total] $filepath"
fi
}
export -f invoke_script
total_files="$(find $Datadir -maxdepth 1 -type f -not -name '*.csv' | wc -l)"
mkdir -p "$Tempdir"
find $Datadir -maxdepth 1 -type f -not -name '*.csv' | awk '{print NR " " $0}' | xargs -I{} -P$Jobs -- bash -c "invoke_script $Pythonscript $Tempdir $total_files {}"
# Merge the csvs together
merged_csv="$Resultdir/$(basename $Pythonscript .py).$(date +%s)$(< /dev/urandom tr -dc A-Za-z0-9 | head -c10).csv"
total_files_len="$(($(echo "$total_files" | wc -c) - 1))"
mkdir -p "$Resultdir"
echo "Merging files in $Tempdir/*.csv into $merged_csv..."
if [ "$Removeheaders" ]; then
# https://stackoverflow.com/questions/16890582/unixmerge-multiple-csv-files-with-same-header-by-keeping-the-header-of-the-firs
# https://unix.stackexchange.com/questions/3514/how-to-grep-standard-error-stream-stderr
if awk 'FNR==1 && NR!=1{next;}{print}' "$Tempdir"/*.csv 2>&1 >"$merged_csv" | grep -q 'Argument list too long'; then
echo 'Too many files for awk to process, switching to slower for-loop...'
write_header='true'
: "$((index=1))"
for file in "$Tempdir"/*.csv; do
index_padded="$(printf "%0${total_files_len}d" $index)"
if [ "$write_header" ]; then
write_header=''
echo "[$index_padded/$total_files] < \"$file\" > \"$merged_csv\""
< "$file" > "$merged_csv"
else
echo "[$index_padded/$total_files] tail -n +2 \"$file\" >> \"$merged_csv\""
tail -n +2 "$file" >> "$merged_csv"
fi
: "$((index=index+1))"
done
fi
else
if cat "$Tempdir"/*.csv 2>&1 >"$merged_csv" | grep -q 'Argument list too long'; then
echo 'Too many files for cat to process, switching to slower for-loop...'
: "$((index=1))"
for file in "$Tempdir"/*.csv; do
index_padded="$(printf "%0${total_files_len}d" $index)"
echo "[$index_padded/$total_files] < \"$file\" >> \"$merged_csv\""
< "$file" >> "$merged_csv"
: "$((index=index+1))"
done
fi
fi
total_files="$(find $Tempdir -type f -name '*.csv' | wc -l)"
total_size="$(du -sh "$Tempdir" | awk '{print $1}')"
echo "Files merged into $merged_csv"
echo "Temporary csv files located in $Tempdir: $total_files total files, $total_size in size. You may delete this directory."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment