Last active
August 27, 2021 11:59
-
-
Save pablosjv/da9c8b578a6943a85fbf9cb09c247ffe to your computer and use it in GitHub Desktop.
Large Scale Pytorch Inference Pipeline: Spark vs Dask - Code Examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -e | |
check_finish() { | |
ID=$1 | |
while ! dask-yarn status "${ID}" 2>/dev/null | awk -v col=3 '{print $col}' | grep FINISHED; do | |
echo -e "Application ${ID} not finihsed" | |
sleep 5 | |
done | |
echo -e "Application ${ID} has finished" | |
} | |
check_succeeded() { | |
ID=$1 | |
if dask-yarn status "${ID}" 2>/dev/null | awk -v col=4 '{print $col}' | grep SUCCEEDED; then | |
echo -e "SUCCESS: Application ${ID} SUCCEEDED!" | |
return 0 | |
else | |
echo -e "PANIC: Application ${ID} NOT SUCCEEDED" | |
return 1 | |
fi | |
} | |
echo -e "----------------------------------------------------------------------" | |
echo -e "----------------------- Start Dask Yarn Submit -----------------------" | |
echo -e "----------------------------------------------------------------------" | |
# Default values of arguments | |
APP_NAME="dask" | |
CODE_LOCATION="not-set" | |
ENVIRONMENT_URI="not-set" | |
DASK_YARN_ARGS="" | |
DEPLOY_MODE=local | |
# Reference: https://pretzelhands.com/posts/command-line-flags | |
# Loop through arguments and process them | |
for arg in "$@"; do | |
case $arg in | |
--local-mode) | |
DEPLOY_MODE=local | |
shift | |
;; | |
-n | --name) | |
APP_NAME="$2" | |
shift # Remove argument name from processing | |
shift # Remove argument value from processing | |
;; | |
-c | --code_location) | |
CODE_LOCATION="$2" | |
shift # Remove argument name from processing | |
shift # Remove argument value from processing | |
;; | |
-v | --environment_uri) | |
ENVIRONMENT_URI="$2" | |
shift # Remove argument name from processing | |
shift # Remove argument value from processing | |
;; | |
-e | --env-var) | |
eval "export ${2}" | |
shift # Remove argument name from processing | |
shift # Remove argument value from processing | |
;; | |
-y | --yarn-args) | |
DASK_YARN_ARGS=$2 | |
shift # Remove generic argument from processing | |
shift # Remove argument value from processing | |
;; | |
esac | |
done | |
APP_LOCAL_PATH=${HOME}/dask-submit-launcher/${APP_NAME} | |
echo -e "APP_NAME=${APP_NAME}" | |
echo -e "CODE_LOCATION=${CODE_LOCATION}" | |
echo -e "ENVIRONMENT_URI=${ENVIRONMENT_URI}" | |
echo -e "DEPLOY_MODE=${DEPLOY_MODE}" | |
echo -e "DASK_YARN_ARGS=${DASK_YARN_ARGS}" | |
echo -e ">>> Creating path to store temporal files in ${APP_LOCAL_PATH}" | |
mkdir -p "${APP_LOCAL_PATH}" | |
mkdir -p "${APP_LOCAL_PATH}/env" | |
echo -e ">>> Downloading needed files" | |
aws s3 cp "${CODE_LOCATION}" "${APP_LOCAL_PATH}/run.py" | |
aws s3 cp "${ENVIRONMENT_URI}" "${APP_LOCAL_PATH}/env.tar.gz" | |
echo -e ">>> Create environment for master node" | |
tar --overwrite -xzf "${APP_LOCAL_PATH}/env.tar.gz" -C "${APP_LOCAL_PATH}/env" | |
# shellcheck disable=SC1091 | |
source "${APP_LOCAL_PATH}/env/bin/activate" | |
echo -e ">>> Launching Dask Yarn Submmit" | |
echo -e "dask-yarn submit --deploy-mode ${DEPLOY_MODE} --environment ${ENVIRONMENT_URI} ${DASK_YARN_ARGS} ${APP_LOCAL_PATH}/run.py" | |
# shellcheck disable=SC2086 | |
APPLICATION_ID=$( | |
dask-yarn submit \ | |
--deploy-mode "${DEPLOY_MODE}" \ | |
--environment "${ENVIRONMENT_URI}" \ | |
${DASK_YARN_ARGS} \ | |
"${APP_LOCAL_PATH}/run.py" | |
) | |
echo "${APPLICATION_ID}" | |
# NOTE: this is only for deply-mode=cluster | |
if [ $DEPLOY_MODE = "remote" ]; then | |
check_finish "${APPLICATION_ID}" | |
if ! check_succeeded "${APPLICATION_ID}"; then | |
echo ">>> ERROR: Dask Job has failed!!!" | |
exit 1 | |
fi | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment