Skip to content

Instantly share code, notes, and snippets.

@pablosjv
Last active August 27, 2021 11:59
Show Gist options
  • Save pablosjv/da9c8b578a6943a85fbf9cb09c247ffe to your computer and use it in GitHub Desktop.
Save pablosjv/da9c8b578a6943a85fbf9cb09c247ffe to your computer and use it in GitHub Desktop.
Large Scale Pytorch Inference Pipeline: Spark vs Dask - Code Examples
#!/usr/bin/env bash
set -e
check_finish() {
ID=$1
while ! dask-yarn status "${ID}" 2>/dev/null | awk -v col=3 '{print $col}' | grep FINISHED; do
echo -e "Application ${ID} not finihsed"
sleep 5
done
echo -e "Application ${ID} has finished"
}
check_succeeded() {
ID=$1
if dask-yarn status "${ID}" 2>/dev/null | awk -v col=4 '{print $col}' | grep SUCCEEDED; then
echo -e "SUCCESS: Application ${ID} SUCCEEDED!"
return 0
else
echo -e "PANIC: Application ${ID} NOT SUCCEEDED"
return 1
fi
}
echo -e "----------------------------------------------------------------------"
echo -e "----------------------- Start Dask Yarn Submit -----------------------"
echo -e "----------------------------------------------------------------------"
# Default values of arguments
APP_NAME="dask"
CODE_LOCATION="not-set"
ENVIRONMENT_URI="not-set"
DASK_YARN_ARGS=""
DEPLOY_MODE=local
# Reference: https://pretzelhands.com/posts/command-line-flags
# Loop through arguments and process them
for arg in "$@"; do
case $arg in
--local-mode)
DEPLOY_MODE=local
shift
;;
-n | --name)
APP_NAME="$2"
shift # Remove argument name from processing
shift # Remove argument value from processing
;;
-c | --code_location)
CODE_LOCATION="$2"
shift # Remove argument name from processing
shift # Remove argument value from processing
;;
-v | --environment_uri)
ENVIRONMENT_URI="$2"
shift # Remove argument name from processing
shift # Remove argument value from processing
;;
-e | --env-var)
eval "export ${2}"
shift # Remove argument name from processing
shift # Remove argument value from processing
;;
-y | --yarn-args)
DASK_YARN_ARGS=$2
shift # Remove generic argument from processing
shift # Remove argument value from processing
;;
esac
done
APP_LOCAL_PATH=${HOME}/dask-submit-launcher/${APP_NAME}
echo -e "APP_NAME=${APP_NAME}"
echo -e "CODE_LOCATION=${CODE_LOCATION}"
echo -e "ENVIRONMENT_URI=${ENVIRONMENT_URI}"
echo -e "DEPLOY_MODE=${DEPLOY_MODE}"
echo -e "DASK_YARN_ARGS=${DASK_YARN_ARGS}"
echo -e ">>> Creating path to store temporal files in ${APP_LOCAL_PATH}"
mkdir -p "${APP_LOCAL_PATH}"
mkdir -p "${APP_LOCAL_PATH}/env"
echo -e ">>> Downloading needed files"
aws s3 cp "${CODE_LOCATION}" "${APP_LOCAL_PATH}/run.py"
aws s3 cp "${ENVIRONMENT_URI}" "${APP_LOCAL_PATH}/env.tar.gz"
echo -e ">>> Create environment for master node"
tar --overwrite -xzf "${APP_LOCAL_PATH}/env.tar.gz" -C "${APP_LOCAL_PATH}/env"
# shellcheck disable=SC1091
source "${APP_LOCAL_PATH}/env/bin/activate"
echo -e ">>> Launching Dask Yarn Submmit"
echo -e "dask-yarn submit --deploy-mode ${DEPLOY_MODE} --environment ${ENVIRONMENT_URI} ${DASK_YARN_ARGS} ${APP_LOCAL_PATH}/run.py"
# shellcheck disable=SC2086
APPLICATION_ID=$(
dask-yarn submit \
--deploy-mode "${DEPLOY_MODE}" \
--environment "${ENVIRONMENT_URI}" \
${DASK_YARN_ARGS} \
"${APP_LOCAL_PATH}/run.py"
)
echo "${APPLICATION_ID}"
# NOTE: this is only for deply-mode=cluster
if [ $DEPLOY_MODE = "remote" ]; then
check_finish "${APPLICATION_ID}"
if ! check_succeeded "${APPLICATION_ID}"; then
echo ">>> ERROR: Dask Job has failed!!!"
exit 1
fi
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment