Last active
January 2, 2020 14:41
-
-
Save bokwoon95/4da4443f3cf360d62d8f40ffb1ac372f to your computer and use it in GitHub Desktop.
Run multiple instances of a python script on each file inside a directory concurrently, then merge the results
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
shopt -s extglob | |
HELP=" Usage: multiprocessor.sh --pythonscript <script.py> --datadir <data_directory> [--resultdir <result_directory> --removeheaders --jobs <num_of_jobs>] | |
multiprocessor.sh coordinates multiple instances of the same python script to | |
run concurrently on files inside the data directory. This allows you to write | |
python scripts that are single-threaded in nature, and let this script handle | |
the concurrency. | |
The only requirement is that the python script print its result to standard | |
output, so that multiprocessor.sh can redirect the stdout into a temporary | |
csv file. Once all script instances have finished, the contents of the | |
temporary csv files will be merged into a single csv file inside the result | |
directory. The merged csv will have the same name as the python script, but | |
with a *.csv extension. If a result directory is not provided, the current | |
directory ($(pwd)) will be used. | |
Examples: | |
multiprocessor.sh --pythonscript ./scrape.py --datadir ./somedir | |
multiprocessor.sh --pythonscript ./scrape.py --datadir ./somedir --resultdir ./results | |
multiprocessor.sh --pythonscript ./scrape.py --datadir ./somedir --resultdir ./results --removeheaders | |
Options: | |
--pythonscript : Path to the python script to be run | |
--datadir : The directory containing the files to be passed to the python script as arguments | |
--resultdir : The directory to dump the merged csv into | |
--removeheaders : Remove headers from all files except the first | |
--jobs : The max number of concurrent jobs to run at a time (default 30) | |
" | |
# Unpack script arguments | |
argc="$#";: "$((i=0))" | |
while [ "$((i))" -lt "$argc" ]; do | |
case "$1" in | |
--help|-h) Help='true';; | |
--pythonscript) shift;: $((i=i+1)); Pythonscript="$1";; | |
--datadir) shift;: $((i=i+1)); Datadir="$1";; | |
--resultdir) shift;: $((i=i+1)); Resultdir="$1";; | |
--removeheaders) Removeheaders='true';; | |
--jobs) shift;: $((i=i+1)); Jobs="$1";; | |
*) :;; | |
esac | |
shift;: "$((i=i+1))" | |
done | |
if [ "$argc" -eq 0 ] || [ "$Help" ] ; then | |
echo "$HELP" | |
exit 0 | |
fi | |
# Ensure python3 present | |
if ! command -v python3 >/dev/null 2>&1; then | |
echo 'python3 not found' | |
exit 1 | |
fi | |
# Ensure $Pythonscript present | |
if [ ! -f "$Pythonscript" ]; then | |
echo "--pythonscript not found" | |
exit 1 | |
fi | |
# Ensure $Datadir present | |
if [ ! "$Datadir" ]; then | |
echo "--datadir not provided" | |
exit 1 | |
fi | |
# Ensure valid $Resultdir | |
if [ ! "$Resultdir" ]; then | |
Resultdir="$(pwd)" | |
fi | |
# Ensure valid $Jobs | |
if [ ! "$Jobs" ]; then | |
Jobs='30' | |
fi | |
Tempdir="/tmp/$(basename $Pythonscript .py)_$(date +%s)$(< /dev/urandom tr -dc A-Za-z0-9 | head -c10)" | |
invoke_script() { | |
if [ $# -gt 0 ]; then | |
Pythonscript="$1" | |
Tempdir="$2" | |
total="$3" | |
index="$4" | |
index_padded="$(printf "%0$(($(echo "$total" | wc -c) - 1))d" $index)" | |
filepath="$5" | |
filepath_slashes_to_dots="$(echo $filepath | sed 's#/#.#g' | sed 's#^\.*##g')" | |
echo "[START][$index_padded/$total] $filepath" | |
python3 "$Pythonscript" "$filepath" > "$Tempdir/$filepath_slashes_to_dots.csv" | |
echo "[ END][$index_padded/$total] $filepath" | |
fi | |
} | |
export -f invoke_script | |
total_files="$(find $Datadir -maxdepth 1 -type f -not -name '*.csv' | wc -l)" | |
mkdir -p "$Tempdir" | |
find $Datadir -maxdepth 1 -type f -not -name '*.csv' | awk '{print NR " " $0}' | xargs -I{} -P$Jobs -- bash -c "invoke_script $Pythonscript $Tempdir $total_files {}" | |
# Merge the csvs together | |
merged_csv="$Resultdir/$(basename $Pythonscript .py).$(date +%s)$(< /dev/urandom tr -dc A-Za-z0-9 | head -c10).csv" | |
total_files_len="$(($(echo "$total_files" | wc -c) - 1))" | |
mkdir -p "$Resultdir" | |
echo "Merging files in $Tempdir/*.csv into $merged_csv..." | |
if [ "$Removeheaders" ]; then | |
# https://stackoverflow.com/questions/16890582/unixmerge-multiple-csv-files-with-same-header-by-keeping-the-header-of-the-firs | |
# https://unix.stackexchange.com/questions/3514/how-to-grep-standard-error-stream-stderr | |
if awk 'FNR==1 && NR!=1{next;}{print}' "$Tempdir"/*.csv 2>&1 >"$merged_csv" | grep -q 'Argument list too long'; then | |
echo 'Too many files for awk to process, switching to slower for-loop...' | |
write_header='true' | |
: "$((index=1))" | |
for file in "$Tempdir"/*.csv; do | |
index_padded="$(printf "%0${total_files_len}d" $index)" | |
if [ "$write_header" ]; then | |
write_header='' | |
echo "[$index_padded/$total_files] < \"$file\" > \"$merged_csv\"" | |
< "$file" > "$merged_csv" | |
else | |
echo "[$index_padded/$total_files] tail -n +2 \"$file\" >> \"$merged_csv\"" | |
tail -n +2 "$file" >> "$merged_csv" | |
fi | |
: "$((index=index+1))" | |
done | |
fi | |
else | |
if cat "$Tempdir"/*.csv 2>&1 >"$merged_csv" | grep -q 'Argument list too long'; then | |
echo 'Too many files for cat to process, switching to slower for-loop...' | |
: "$((index=1))" | |
for file in "$Tempdir"/*.csv; do | |
index_padded="$(printf "%0${total_files_len}d" $index)" | |
echo "[$index_padded/$total_files] < \"$file\" >> \"$merged_csv\"" | |
< "$file" >> "$merged_csv" | |
: "$((index=index+1))" | |
done | |
fi | |
fi | |
total_files="$(find $Tempdir -type f -name '*.csv' | wc -l)" | |
total_size="$(du -sh "$Tempdir" | awk '{print $1}')" | |
echo "Files merged into $merged_csv" | |
echo "Temporary csv files located in $Tempdir: $total_files total files, $total_size in size. You may delete this directory." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment