Skip to content

Instantly share code, notes, and snippets.

@philschmid
Created April 19, 2022 11:59
Show Gist options
  • Select an option

  • Save philschmid/403b79e7527b540561f7031efcc4620f to your computer and use it in GitHub Desktop.

Select an option

Save philschmid/403b79e7527b540561f7031efcc4620f to your computer and use it in GitHub Desktop.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: xgboost-trainer-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12, pipelines.kubeflow.org/pipeline_compilation_time: '2022-04-19T13:58:21.551241',
pipelines.kubeflow.org/pipeline_spec: '{"description": "A trainer that does end-to-end
distributed training for XGBoost models.", "inputs": [{"default": "gs://{{kfp-default-bucket}}",
"name": "output", "optional": true}, {"default": "{{kfp-project-id}}", "name":
"project", "optional": true}, {"default": "HALT_ON_ERROR", "name": "diagnostic_mode",
"optional": true}, {"default": "5", "name": "rounds", "optional": true}], "name":
"xgboost-trainer"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.12}
spec:
entrypoint: xgboost-trainer
templates:
- name: confusion-matrix
container:
args: [--predictions, '{{inputs.parameters.output}}/{{workflow.uid}}/data/predict_output/part-*.csv',
--target_lambda, '', --output, '{{inputs.parameters.output}}/{{workflow.uid}}/data',
--ui-metadata-output-path, /tmp/outputs/MLPipeline_UI_metadata/data, --metrics-output-path,
/tmp/outputs/MLPipeline_Metrics/data]
command: [python2, /ml/confusion_matrix.py]
image: gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:1.8.0-alpha.0
inputs:
parameters:
- {name: output}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: mlpipeline-metrics, path: /tmp/outputs/MLPipeline_Metrics/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Calculates
confusion matrix", "implementation": {"container": {"args": ["--predictions",
{"inputValue": "Predictions"}, "--target_lambda", {"inputValue": "Target
lambda"}, "--output", {"inputValue": "Output dir"}, "--ui-metadata-output-path",
{"outputPath": "MLPipeline UI metadata"}, "--metrics-output-path", {"outputPath":
"MLPipeline Metrics"}], "command": ["python2", "/ml/confusion_matrix.py"],
"image": "gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:1.8.0-alpha.0"}},
"inputs": [{"description": "GCS path of prediction file pattern.", "name":
"Predictions", "type": "GCSPath"}, {"default": "", "description": "Text
of Python lambda function which computes target value. For example, \"lambda
x: x[''a''] + x[''b'']\". If not set, the input must include a \"target\"
column.", "name": "Target lambda", "type": "String"}, {"description": "GCS
path of the output directory.", "name": "Output dir", "type": "GCSPath"}],
"name": "Confusion matrix", "outputs": [{"name": "MLPipeline UI metadata",
"type": "UI metadata"}, {"name": "MLPipeline Metrics", "type": "Metrics"}]}',
pipelines.kubeflow.org/component_ref: '{"digest": "1c7a9958cddbb4b7cda1daf5cacfc70196a645993c7f43d24987829a65b6767f",
"url": "https://raw.githubusercontent.com/kubeflow/pipelines/1.8.0-alpha.0/components/local/confusion_matrix/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Output dir": "{{inputs.parameters.output}}/{{workflow.uid}}/data",
"Predictions": "{{inputs.parameters.output}}/{{workflow.uid}}/data/predict_output/part-*.csv",
"Target lambda": ""}'}
- name: dataproc-create-cluster
container:
args: [--ui_metadata_path, /tmp/outputs/MLPipeline_UI_metadata/data, kfp_component.google.dataproc,
create_cluster, --project_id, '{{inputs.parameters.project}}', --region, us-central1,
--name, 'xgb-{{workflow.uid}}', --name_prefix, '', --initialization_actions,
'["gs://ml-pipeline/sample-pipeline/xgboost/initialization_actions.sh"]',
--config_bucket, '', --image_version, '1.5', --cluster, '', --wait_interval,
'30', --cluster_name_output_path, /tmp/outputs/cluster_name/data]
command: []
env:
- {name: KFP_POD_NAME, value: '{{pod.name}}'}
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_POD_UID
valueFrom:
fieldRef: {fieldPath: metadata.uid}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- name: KFP_RUN_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
- name: ENABLE_CACHING
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
image: gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3
inputs:
parameters:
- {name: project}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: dataproc-create-cluster-cluster_name, path: /tmp/outputs/cluster_name/data}
metadata:
labels:
add-pod-env: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Creates
a DataProc cluster under a project.\n", "implementation": {"container":
{"args": ["--ui_metadata_path", {"outputPath": "MLPipeline UI metadata"},
"kfp_component.google.dataproc", "create_cluster", "--project_id", {"inputValue":
"project_id"}, "--region", {"inputValue": "region"}, "--name", {"inputValue":
"name"}, "--name_prefix", {"inputValue": "name_prefix"}, "--initialization_actions",
{"inputValue": "initialization_actions"}, "--config_bucket", {"inputValue":
"config_bucket"}, "--image_version", {"inputValue": "image_version"}, "--cluster",
{"inputValue": "cluster"}, "--wait_interval", {"inputValue": "wait_interval"},
"--cluster_name_output_path", {"outputPath": "cluster_name"}], "env": {"KFP_POD_NAME":
"{{pod.name}}"}, "image": "gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3"}},
"inputs": [{"description": "Required. The ID of the Google Cloud Platform
project that the cluster belongs to.", "name": "project_id", "type": "GCPProjectID"},
{"description": "Required. The Cloud Dataproc region in which to handle
the request.", "name": "region", "type": "GCPRegion"}, {"default": "", "description":
"Optional. The cluster name. Cluster names within a project must be unique.
Names of deleted clusters can be reused", "name": "name", "type": "String"},
{"default": "", "description": "Optional. The prefix of the cluster name.",
"name": "name_prefix", "type": "String"}, {"default": "", "description":
"Optional. List of GCS URIs of executables to execute on each node after
config is completed. By default, executables are run on master and all worker
nodes.", "name": "initialization_actions", "type": "List"}, {"default":
"", "description": "Optional. A Google Cloud Storage bucket used to stage
job dependencies, config files, and job driver console output.", "name":
"config_bucket", "type": "GCSPath"}, {"default": "", "description": "Optional.
The version of software inside the cluster.", "name": "image_version", "type":
"String"}, {"default": "", "description": "Optional. The full cluster config.
See [full details](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster)",
"name": "cluster", "type": "Dict"}, {"default": "30", "description": "Optional.
The wait seconds between polling the operation. Defaults to 30.", "name":
"wait_interval", "type": "Integer"}], "metadata": {"labels": {"add-pod-env":
"true"}}, "name": "dataproc_create_cluster", "outputs": [{"description":
"The cluster name of the created cluster.", "name": "cluster_name", "type":
"String"}, {"name": "MLPipeline UI metadata", "type": "UI metadata"}]}',
pipelines.kubeflow.org/component_ref: '{"digest": "79c566b98983ff3064db6ddb9ee314192758c98b7329594554abd700059195d6",
"url": "https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataproc/create_cluster/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"cluster": "", "config_bucket":
"", "image_version": "1.5", "initialization_actions": "[\"gs://ml-pipeline/sample-pipeline/xgboost/initialization_actions.sh\"]",
"name": "xgb-{{workflow.uid}}", "name_prefix": "", "project_id": "{{inputs.parameters.project}}",
"region": "us-central1", "wait_interval": "30"}'}
- name: dataproc-delete-cluster
container:
args: [kfp_component.google.dataproc, delete_cluster, --project_id, '{{inputs.parameters.project}}',
--region, us-central1, --name, 'xgb-{{workflow.uid}}', --wait_interval, '30']
command: []
env:
- {name: KFP_POD_NAME, value: '{{pod.name}}'}
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_POD_UID
valueFrom:
fieldRef: {fieldPath: metadata.uid}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- name: KFP_RUN_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
- name: ENABLE_CACHING
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
image: gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3
inputs:
parameters:
- {name: project}
metadata:
labels:
add-pod-env: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Deletes
a DataProc cluster.\n", "implementation": {"container": {"args": ["kfp_component.google.dataproc",
"delete_cluster", "--project_id", {"inputValue": "project_id"}, "--region",
{"inputValue": "region"}, "--name", {"inputValue": "name"}, "--wait_interval",
{"inputValue": "wait_interval"}], "env": {"KFP_POD_NAME": "{{pod.name}}"},
"image": "gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3"}}, "inputs": [{"description":
"Required. The ID of the Google Cloud Platform project that the cluster
belongs to.", "name": "project_id", "type": "GCPProjectID"}, {"description":
"Required. The Cloud Dataproc region in which to handle the request.", "name":
"region", "type": "GCPRegion"}, {"description": "Required. The cluster name
to delete.", "name": "name", "type": "String"}, {"default": "30", "description":
"Optional. The wait seconds between polling the operation. Defaults to 30.",
"name": "wait_interval", "type": "Integer"}], "metadata": {"labels": {"add-pod-env":
"true"}}, "name": "dataproc_delete_cluster"}', pipelines.kubeflow.org/component_ref: '{"digest":
"ed957caecff54488362e8a979b5b9041e50d912e34f7022e8474e6205125169b", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataproc/delete_cluster/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"name": "xgb-{{workflow.uid}}",
"project_id": "{{inputs.parameters.project}}", "region": "us-central1",
"wait_interval": "30"}'}
- name: dataproc-submit-pyspark-job
container:
args: [--ui_metadata_path, /tmp/outputs/MLPipeline_UI_metadata/data, kfp_component.google.dataproc,
submit_pyspark_job, --project_id, '{{inputs.parameters.project}}', --region,
us-central1, --cluster_name, 'xgb-{{workflow.uid}}', --main_python_file_uri,
'gs://ml-pipeline/sample-pipeline/xgboost/analyze_run.py', --args, '["--output",
"{{inputs.parameters.output}}/{{workflow.uid}}/data", "--train", "gs://ml-pipeline/sample-data/sfpd/train.csv",
"--schema", "gs://ml-pipeline/sample-data/sfpd/schema.json"]', --pyspark_job,
'', --job, '', --wait_interval, '30', --job_id_output_path, /tmp/outputs/job_id/data]
command: []
env:
- {name: KFP_POD_NAME, value: '{{pod.name}}'}
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_POD_UID
valueFrom:
fieldRef: {fieldPath: metadata.uid}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- name: KFP_RUN_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
- name: ENABLE_CACHING
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
image: gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3
inputs:
parameters:
- {name: output}
- {name: project}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: dataproc-submit-pyspark-job-job_id, path: /tmp/outputs/job_id/data}
metadata:
annotations: {pipelines.kubeflow.org/task_display_name: Analyzer, pipelines.kubeflow.org/component_spec: '{"description":
"Submits a Cloud Dataproc job for running Apache PySpark applications on
YARN.", "implementation": {"container": {"args": ["--ui_metadata_path",
{"outputPath": "MLPipeline UI metadata"}, "kfp_component.google.dataproc",
"submit_pyspark_job", "--project_id", {"inputValue": "project_id"}, "--region",
{"inputValue": "region"}, "--cluster_name", {"inputValue": "cluster_name"},
"--main_python_file_uri", {"inputValue": "main_python_file_uri"}, "--args",
{"inputValue": "args"}, "--pyspark_job", {"inputValue": "pyspark_job"},
"--job", {"inputValue": "job"}, "--wait_interval", {"inputValue": "wait_interval"},
"--job_id_output_path", {"outputPath": "job_id"}], "env": {"KFP_POD_NAME":
"{{pod.name}}"}, "image": "gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3"}},
"inputs": [{"description": "Required. The ID of the Google Cloud Platform
project that the cluster belongs to.", "name": "project_id", "type": "GCPProjectID"},
{"description": "Required. The Cloud Dataproc region in which to handle
the request.", "name": "region", "type": "GCPRegion"}, {"description": "Required.
The cluster to run the job.", "name": "cluster_name", "type": "String"},
{"description": "Required. The HCFS URI of the main Python file to use
as the driver. Must be a .py file.", "name": "main_python_file_uri", "type":
"GCSPath"}, {"default": "", "description": "Optional. The arguments to pass
to the driver. Do not include arguments, such as --conf, that can be set
as job properties, since a collision may occur that causes an incorrect
job submission.", "name": "args", "type": "List"}, {"default": "", "description":
"Optional. The full payload of a [PySparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/PySparkJob).",
"name": "pyspark_job", "type": "Dict"}, {"default": "", "description": "Optional.
The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).",
"name": "job", "type": "Dict"}, {"default": "30", "description": "Optional.
The wait seconds between polling the operation. Defaults to 30.", "name":
"wait_interval", "type": "Integer"}], "metadata": {"labels": {"add-pod-env":
"true"}}, "name": "dataproc_submit_pyspark_job", "outputs": [{"description":
"The ID of the created job.", "name": "job_id", "type": "String"}, {"name":
"MLPipeline UI metadata", "type": "UI metadata"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"6b520935d3a2aa8094a09dfe2e4ac64569f6ba4ded73ab3b96466bd987ec238c", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataproc/submit_pyspark_job/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"args": "[\"--output\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data\",
\"--train\", \"gs://ml-pipeline/sample-data/sfpd/train.csv\", \"--schema\",
\"gs://ml-pipeline/sample-data/sfpd/schema.json\"]", "cluster_name": "xgb-{{workflow.uid}}",
"job": "", "main_python_file_uri": "gs://ml-pipeline/sample-pipeline/xgboost/analyze_run.py",
"project_id": "{{inputs.parameters.project}}", "pyspark_job": "", "region":
"us-central1", "wait_interval": "30"}'}
labels:
add-pod-env: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: dataproc-submit-pyspark-job-2
container:
args: [--ui_metadata_path, /tmp/outputs/MLPipeline_UI_metadata/data, kfp_component.google.dataproc,
submit_pyspark_job, --project_id, '{{inputs.parameters.project}}', --region,
us-central1, --cluster_name, 'xgb-{{workflow.uid}}', --main_python_file_uri,
'gs://ml-pipeline/sample-pipeline/xgboost/transform_run.py', --args, '["--output",
"{{inputs.parameters.output}}/{{workflow.uid}}/data", "--analysis", "{{inputs.parameters.output}}/{{workflow.uid}}/data",
"--target", "resolution", "--train", "gs://ml-pipeline/sample-data/sfpd/train.csv",
"--eval", "gs://ml-pipeline/sample-data/sfpd/eval.csv"]', --pyspark_job,
'', --job, '', --wait_interval, '30', --job_id_output_path, /tmp/outputs/job_id/data]
command: []
env:
- {name: KFP_POD_NAME, value: '{{pod.name}}'}
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_POD_UID
valueFrom:
fieldRef: {fieldPath: metadata.uid}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- name: KFP_RUN_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
- name: ENABLE_CACHING
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
image: gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3
inputs:
parameters:
- {name: output}
- {name: project}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: dataproc-submit-pyspark-job-2-job_id, path: /tmp/outputs/job_id/data}
metadata:
annotations: {pipelines.kubeflow.org/task_display_name: Transformer, pipelines.kubeflow.org/component_spec: '{"description":
"Submits a Cloud Dataproc job for running Apache PySpark applications on
YARN.", "implementation": {"container": {"args": ["--ui_metadata_path",
{"outputPath": "MLPipeline UI metadata"}, "kfp_component.google.dataproc",
"submit_pyspark_job", "--project_id", {"inputValue": "project_id"}, "--region",
{"inputValue": "region"}, "--cluster_name", {"inputValue": "cluster_name"},
"--main_python_file_uri", {"inputValue": "main_python_file_uri"}, "--args",
{"inputValue": "args"}, "--pyspark_job", {"inputValue": "pyspark_job"},
"--job", {"inputValue": "job"}, "--wait_interval", {"inputValue": "wait_interval"},
"--job_id_output_path", {"outputPath": "job_id"}], "env": {"KFP_POD_NAME":
"{{pod.name}}"}, "image": "gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3"}},
"inputs": [{"description": "Required. The ID of the Google Cloud Platform
project that the cluster belongs to.", "name": "project_id", "type": "GCPProjectID"},
{"description": "Required. The Cloud Dataproc region in which to handle
the request.", "name": "region", "type": "GCPRegion"}, {"description": "Required.
The cluster to run the job.", "name": "cluster_name", "type": "String"},
{"description": "Required. The HCFS URI of the main Python file to use
as the driver. Must be a .py file.", "name": "main_python_file_uri", "type":
"GCSPath"}, {"default": "", "description": "Optional. The arguments to pass
to the driver. Do not include arguments, such as --conf, that can be set
as job properties, since a collision may occur that causes an incorrect
job submission.", "name": "args", "type": "List"}, {"default": "", "description":
"Optional. The full payload of a [PySparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/PySparkJob).",
"name": "pyspark_job", "type": "Dict"}, {"default": "", "description": "Optional.
The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).",
"name": "job", "type": "Dict"}, {"default": "30", "description": "Optional.
The wait seconds between polling the operation. Defaults to 30.", "name":
"wait_interval", "type": "Integer"}], "metadata": {"labels": {"add-pod-env":
"true"}}, "name": "dataproc_submit_pyspark_job", "outputs": [{"description":
"The ID of the created job.", "name": "job_id", "type": "String"}, {"name":
"MLPipeline UI metadata", "type": "UI metadata"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"6b520935d3a2aa8094a09dfe2e4ac64569f6ba4ded73ab3b96466bd987ec238c", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataproc/submit_pyspark_job/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"args": "[\"--output\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data\",
\"--analysis\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data\",
\"--target\", \"resolution\", \"--train\", \"gs://ml-pipeline/sample-data/sfpd/train.csv\",
\"--eval\", \"gs://ml-pipeline/sample-data/sfpd/eval.csv\"]", "cluster_name":
"xgb-{{workflow.uid}}", "job": "", "main_python_file_uri": "gs://ml-pipeline/sample-pipeline/xgboost/transform_run.py",
"project_id": "{{inputs.parameters.project}}", "pyspark_job": "", "region":
"us-central1", "wait_interval": "30"}'}
labels:
add-pod-env: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: dataproc-submit-spark-job
container:
args: [--ui_metadata_path, /tmp/outputs/MLPipeline_UI_metadata/data, kfp_component.google.dataproc,
submit_spark_job, --project_id, '{{inputs.parameters.project}}', --region,
us-central1, --cluster_name, 'xgb-{{workflow.uid}}', --main_jar_file_uri,
'', --main_class, ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer, --args,
'["gs://ml-pipeline/sample-data/xgboost-config/trainconfcla.json", "{{inputs.parameters.rounds}}",
"2", "{{inputs.parameters.output}}/{{workflow.uid}}/data", "resolution",
"{{inputs.parameters.output}}/{{workflow.uid}}/data/train/part-*", "{{inputs.parameters.output}}/{{workflow.uid}}/data/eval/part-*",
"{{inputs.parameters.output}}/{{workflow.uid}}/data/train_output"]', --spark_job,
'{"jarFileUris": ["gs://ml-pipeline/sample-pipeline/xgboost/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar"]}',
--job, '', --wait_interval, '30', --job_id_output_path, /tmp/outputs/job_id/data]
command: []
env:
- {name: KFP_POD_NAME, value: '{{pod.name}}'}
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_POD_UID
valueFrom:
fieldRef: {fieldPath: metadata.uid}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- name: KFP_RUN_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
- name: ENABLE_CACHING
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
image: gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3
inputs:
parameters:
- {name: output}
- {name: project}
- {name: rounds}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: dataproc-submit-spark-job-job_id, path: /tmp/outputs/job_id/data}
metadata:
annotations: {pipelines.kubeflow.org/task_display_name: Trainer, pipelines.kubeflow.org/component_spec: '{"description":
"Submits a Cloud Dataproc job for running Apache Spark applications on YARN.",
"implementation": {"container": {"args": ["--ui_metadata_path", {"outputPath":
"MLPipeline UI metadata"}, "kfp_component.google.dataproc", "submit_spark_job",
"--project_id", {"inputValue": "project_id"}, "--region", {"inputValue":
"region"}, "--cluster_name", {"inputValue": "cluster_name"}, "--main_jar_file_uri",
{"inputValue": "main_jar_file_uri"}, "--main_class", {"inputValue": "main_class"},
"--args", {"inputValue": "args"}, "--spark_job", {"inputValue": "spark_job"},
"--job", {"inputValue": "job"}, "--wait_interval", {"inputValue": "wait_interval"},
"--job_id_output_path", {"outputPath": "job_id"}], "env": {"KFP_POD_NAME":
"{{pod.name}}"}, "image": "gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3"}},
"inputs": [{"description": "Required. The ID of the Google Cloud Platform
project that the cluster belongs to.", "name": "project_id", "type": "GCPProjectID"},
{"description": "Required. The Cloud Dataproc region in which to handle
the request.", "name": "region", "type": "GCPRegion"}, {"description": "Required.
The cluster to run the job.", "name": "cluster_name", "type": "String"},
{"default": "", "description": "The HCFS URI of the jar file that contains
the main class.", "name": "main_jar_file_uri", "type": "GCSPath"}, {"default":
"", "description": "The name of the driver''s main class. The jar file that contains
the class must be in the default CLASSPATH or specified in jarFileUris.",
"name": "main_class", "type": "String"}, {"default": "", "description":
"Optional. The arguments to pass to the driver. Do not include arguments,
such as --conf, that can be set as job properties, since a collision may
occur that causes an incorrect job submission.", "name": "args", "type":
"List"}, {"default": "", "description": "Optional. The full payload of a
[SparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob).",
"name": "spark_job", "type": "Dict"}, {"default": "", "description": "Optional.
The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).",
"name": "job", "type": "Dict"}, {"default": "30", "description": "Optional.
The wait seconds between polling the operation. Defaults to 30.", "name":
"wait_interval", "type": "Integer"}], "metadata": {"labels": {"add-pod-env":
"true"}}, "name": "dataproc_submit_spark_job", "outputs": [{"description":
"The ID of the created job.", "name": "job_id", "type": "String"}, {"name":
"MLPipeline UI metadata", "type": "UI metadata"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"607c88233370706414d17449d1c043de8162aeabf45f89c03a22af9e00de09d8", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataproc/submit_spark_job/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"args": "[\"gs://ml-pipeline/sample-data/xgboost-config/trainconfcla.json\",
\"{{inputs.parameters.rounds}}\", \"2\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data\",
\"resolution\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data/train/part-*\",
\"{{inputs.parameters.output}}/{{workflow.uid}}/data/eval/part-*\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data/train_output\"]",
"cluster_name": "xgb-{{workflow.uid}}", "job": "", "main_class": "ml.dmlc.xgboost4j.scala.example.spark.XGBoostTrainer",
"main_jar_file_uri": "", "project_id": "{{inputs.parameters.project}}",
"region": "us-central1", "spark_job": "{\"jarFileUris\": [\"gs://ml-pipeline/sample-pipeline/xgboost/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar\"]}",
"wait_interval": "30"}'}
labels:
add-pod-env: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: dataproc-submit-spark-job-2
container:
args: [--ui_metadata_path, /tmp/outputs/MLPipeline_UI_metadata/data, kfp_component.google.dataproc,
submit_spark_job, --project_id, '{{inputs.parameters.project}}', --region,
us-central1, --cluster_name, 'xgb-{{workflow.uid}}', --main_jar_file_uri,
'', --main_class, ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor,
--args, '["{{inputs.parameters.output}}/{{workflow.uid}}/data/train_output",
"{{inputs.parameters.output}}/{{workflow.uid}}/data/eval/part-*", "{{inputs.parameters.output}}/{{workflow.uid}}/data",
"resolution", "{{inputs.parameters.output}}/{{workflow.uid}}/data/predict_output"]',
--spark_job, '{"jarFileUris": ["gs://ml-pipeline/sample-pipeline/xgboost/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar"]}',
--job, '', --wait_interval, '30', --job_id_output_path, /tmp/outputs/job_id/data]
command: []
env:
- {name: KFP_POD_NAME, value: '{{pod.name}}'}
- name: KFP_POD_NAME
valueFrom:
fieldRef: {fieldPath: metadata.name}
- name: KFP_POD_UID
valueFrom:
fieldRef: {fieldPath: metadata.uid}
- name: KFP_NAMESPACE
valueFrom:
fieldRef: {fieldPath: metadata.namespace}
- name: WORKFLOW_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'}
- name: KFP_RUN_ID
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'}
- name: ENABLE_CACHING
valueFrom:
fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'}
image: gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3
inputs:
parameters:
- {name: output}
- {name: project}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: dataproc-submit-spark-job-2-job_id, path: /tmp/outputs/job_id/data}
metadata:
annotations: {pipelines.kubeflow.org/task_display_name: Predictor, pipelines.kubeflow.org/component_spec: '{"description":
"Submits a Cloud Dataproc job for running Apache Spark applications on YARN.",
"implementation": {"container": {"args": ["--ui_metadata_path", {"outputPath":
"MLPipeline UI metadata"}, "kfp_component.google.dataproc", "submit_spark_job",
"--project_id", {"inputValue": "project_id"}, "--region", {"inputValue":
"region"}, "--cluster_name", {"inputValue": "cluster_name"}, "--main_jar_file_uri",
{"inputValue": "main_jar_file_uri"}, "--main_class", {"inputValue": "main_class"},
"--args", {"inputValue": "args"}, "--spark_job", {"inputValue": "spark_job"},
"--job", {"inputValue": "job"}, "--wait_interval", {"inputValue": "wait_interval"},
"--job_id_output_path", {"outputPath": "job_id"}], "env": {"KFP_POD_NAME":
"{{pod.name}}"}, "image": "gcr.io/ml-pipeline/ml-pipeline-gcp:1.7.0-rc.3"}},
"inputs": [{"description": "Required. The ID of the Google Cloud Platform
project that the cluster belongs to.", "name": "project_id", "type": "GCPProjectID"},
{"description": "Required. The Cloud Dataproc region in which to handle
the request.", "name": "region", "type": "GCPRegion"}, {"description": "Required.
The cluster to run the job.", "name": "cluster_name", "type": "String"},
{"default": "", "description": "The HCFS URI of the jar file that contains
the main class.", "name": "main_jar_file_uri", "type": "GCSPath"}, {"default":
"", "description": "The name of the driver''s main class. The jar file that contains
the class must be in the default CLASSPATH or specified in jarFileUris.",
"name": "main_class", "type": "String"}, {"default": "", "description":
"Optional. The arguments to pass to the driver. Do not include arguments,
such as --conf, that can be set as job properties, since a collision may
occur that causes an incorrect job submission.", "name": "args", "type":
"List"}, {"default": "", "description": "Optional. The full payload of a
[SparkJob](https://cloud.google.com/dataproc/docs/reference/rest/v1/SparkJob).",
"name": "spark_job", "type": "Dict"}, {"default": "", "description": "Optional.
The full payload of a [Dataproc job](https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs).",
"name": "job", "type": "Dict"}, {"default": "30", "description": "Optional.
The wait seconds between polling the operation. Defaults to 30.", "name":
"wait_interval", "type": "Integer"}], "metadata": {"labels": {"add-pod-env":
"true"}}, "name": "dataproc_submit_spark_job", "outputs": [{"description":
"The ID of the created job.", "name": "job_id", "type": "String"}, {"name":
"MLPipeline UI metadata", "type": "UI metadata"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"607c88233370706414d17449d1c043de8162aeabf45f89c03a22af9e00de09d8", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataproc/submit_spark_job/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"args": "[\"{{inputs.parameters.output}}/{{workflow.uid}}/data/train_output\",
\"{{inputs.parameters.output}}/{{workflow.uid}}/data/eval/part-*\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data\",
\"resolution\", \"{{inputs.parameters.output}}/{{workflow.uid}}/data/predict_output\"]",
"cluster_name": "xgb-{{workflow.uid}}", "job": "", "main_class": "ml.dmlc.xgboost4j.scala.example.spark.XGBoostPredictor",
"main_jar_file_uri": "", "project_id": "{{inputs.parameters.project}}",
"region": "us-central1", "spark_job": "{\"jarFileUris\": [\"gs://ml-pipeline/sample-pipeline/xgboost/xgboost4j-example-0.8-SNAPSHOT-jar-with-dependencies.jar\"]}",
"wait_interval": "30"}'}
labels:
add-pod-env: "true"
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
- name: exit-handler-1
inputs:
parameters:
- {name: output}
- {name: project}
- {name: rounds}
dag:
tasks:
- name: confusion-matrix
template: confusion-matrix
dependencies: [dataproc-submit-spark-job-2]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- name: dataproc-create-cluster
template: dataproc-create-cluster
arguments:
parameters:
- {name: project, value: '{{inputs.parameters.project}}'}
- name: dataproc-submit-pyspark-job
template: dataproc-submit-pyspark-job
dependencies: [dataproc-create-cluster]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- {name: project, value: '{{inputs.parameters.project}}'}
- name: dataproc-submit-pyspark-job-2
template: dataproc-submit-pyspark-job-2
dependencies: [dataproc-submit-pyspark-job]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- {name: project, value: '{{inputs.parameters.project}}'}
- name: dataproc-submit-spark-job
template: dataproc-submit-spark-job
dependencies: [dataproc-submit-pyspark-job-2]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- {name: project, value: '{{inputs.parameters.project}}'}
- {name: rounds, value: '{{inputs.parameters.rounds}}'}
- name: dataproc-submit-spark-job-2
template: dataproc-submit-spark-job-2
dependencies: [dataproc-submit-spark-job]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- {name: project, value: '{{inputs.parameters.project}}'}
- name: roc-curve
template: roc-curve
dependencies: [dataproc-submit-spark-job-2]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- name: roc-curve
container:
args: [--predictions, '{{inputs.parameters.output}}/{{workflow.uid}}/data/predict_output/part-*.csv',
--trueclass, ACTION, --true_score_column, ACTION, --target_lambda, '', --output,
'{{inputs.parameters.output}}/{{workflow.uid}}/data', --ui-metadata-output-path,
/tmp/outputs/MLPipeline_UI_metadata/data, --metrics-output-path, /tmp/outputs/MLPipeline_Metrics/data]
command: [python2, /ml/roc.py]
image: gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:1.8.0-alpha.0
inputs:
parameters:
- {name: output}
outputs:
artifacts:
- {name: mlpipeline-ui-metadata, path: /tmp/outputs/MLPipeline_UI_metadata/data}
- {name: mlpipeline-metrics, path: /tmp/outputs/MLPipeline_Metrics/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Calculates
Receiver Operating Characteristic curve. See https://en.wikipedia.org/wiki/Receiver_operating_characteristic",
"implementation": {"container": {"args": ["--predictions", {"inputValue":
"Predictions dir"}, "--trueclass", {"inputValue": "True class"}, "--true_score_column",
{"inputValue": "True score column"}, "--target_lambda", {"inputValue": "Target
lambda"}, "--output", {"inputValue": "Output dir"}, "--ui-metadata-output-path",
{"outputPath": "MLPipeline UI metadata"}, "--metrics-output-path", {"outputPath":
"MLPipeline Metrics"}], "command": ["python2", "/ml/roc.py"], "image": "gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:1.8.0-alpha.0"}},
"inputs": [{"description": "GCS path of prediction file pattern.", "name":
"Predictions dir", "type": "GCSPath"}, {"default": "true", "description":
"The true class label for the sample. Default is \"true\".", "name": "True
class", "type": "String"}, {"default": "true", "description": "The name
of the column for positive probability.", "name": "True score column", "type":
"String"}, {"default": "", "description": "Text of Python lambda function
which returns boolean value indicating whether the classification result
is correct.\\nFor example, \"lambda x: x[''a''] and x[''b'']\". If missing,
input must have a \"target\" column.", "name": "Target lambda", "type":
"String"}, {"description": "GCS path of the output directory.", "name":
"Output dir", "type": "GCSPath"}], "name": "ROC curve", "outputs": [{"name":
"MLPipeline UI metadata", "type": "UI metadata"}, {"name": "MLPipeline Metrics",
"type": "Metrics"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"ebbe55a0df89065a712a420ae42d7a4aa3f66fc3835e1273ff9ce53fad1392f1", "url":
"https://raw.githubusercontent.com/kubeflow/pipelines/1.8.0-alpha.0/components/local/roc/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"Output dir": "{{inputs.parameters.output}}/{{workflow.uid}}/data",
"Predictions dir": "{{inputs.parameters.output}}/{{workflow.uid}}/data/predict_output/part-*.csv",
"Target lambda": "", "True class": "ACTION", "True score column": "ACTION"}'}
- name: run-diagnose-me
container:
args: [--bucket, '{{inputs.parameters.output}}', --execution-mode, '{{inputs.parameters.diagnostic_mode}}',
--project-id, '{{inputs.parameters.project}}', --target-apis, dataproc.googleapis.com,
--quota-check, '[{"metric": "CPUS", "quota_needed": 12.0, "region": "us-central1"}]',
'----output-paths', /tmp/outputs/bucket/data, /tmp/outputs/project_id/data]
command:
- python3
- -u
- -c
- |
from typing import NamedTuple
def run_diagnose_me(
bucket: str,
execution_mode: str,
project_id: str,
target_apis: str,
quota_check: list = None,
) -> NamedTuple('Outputs', [('bucket', str), ('project_id', str)]):
""" Performs environment verification specific to this pipeline.
args:
bucket:
string name of the bucket to be checked. Must be of the format
gs://bucket_root/any/path/here/is/ignored where any path beyond root
is ignored.
execution_mode:
If set to HALT_ON_ERROR will case any error to raise an exception.
This is intended to stop the data processing of a pipeline. Can set
to False to only report Errors/Warnings.
project_id:
GCP project ID which is assumed to be the project under which
current pod is executing.
target_apis:
String consisting of a comma separated list of apis to be verified.
quota_check:
List of entries describing how much quota is required. Each entry
has three fields: region, metric and quota_needed. All
string-typed.
Raises:
RuntimeError: If configuration is not setup properly and
HALT_ON_ERROR flag is set.
"""
# Installing pip3 and kfp, since the base image 'google/cloud-sdk:279.0.0'
# does not come with pip3 pre-installed.
import subprocess
subprocess.run([
'curl', 'https://bootstrap.pypa.io/get-pip.py', '-o', 'get-pip.py'
],
capture_output=True)
subprocess.run(['apt-get', 'install', 'python3-distutils', '--yes'],
capture_output=True)
subprocess.run(['python3', 'get-pip.py'], capture_output=True)
subprocess.run(['python3', '-m', 'pip', 'install', 'kfp>=0.1.31', '--quiet'],
capture_output=True)
import sys
from kfp.cli.diagnose_me import gcp
config_error_observed = False
quota_list = gcp.get_gcp_configuration(
gcp.Commands.GET_QUOTAS, human_readable=False
)
if quota_list.has_error:
print('Failed to retrieve project quota with error %s\n' % (quota_list.stderr))
config_error_observed = True
else:
# Check quota.
quota_dict = {} # Mapping from region to dict[metric, available]
for region_quota in quota_list.json_output:
quota_dict[region_quota['name']] = {}
for quota in region_quota['quotas']:
quota_dict[region_quota['name']][quota['metric']
] = quota['limit'] - quota['usage']
quota_check = [] or quota_check
for single_check in quota_check:
if single_check['region'] not in quota_dict:
print(
'Regional quota for %s does not exist in current project.\n' %
(single_check['region'])
)
config_error_observed = True
else:
if quota_dict[single_check['region']][single_check['metric']
] < single_check['quota_needed']:
print(
'Insufficient quota observed for %s at %s: %s is needed but only %s is available.\n'
% (
single_check['metric'], single_check['region'],
str(single_check['quota_needed']
), str(quota_dict[single_check['region']][single_check['metric']])
)
)
config_error_observed = True
# Get the project ID
# from project configuration
project_config = gcp.get_gcp_configuration(
gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False
)
if not project_config.has_error:
auth_project_id = project_config.parsed_output['core']['project']
print(
'GCP credentials are configured with access to project: %s ...\n' %
(project_id)
)
print('Following account(s) are active under this pipeline:\n')
subprocess.run(['gcloud', 'auth', 'list', '--format', 'json'])
print('\n')
else:
print(
'Project configuration is not accessible with error %s\n' %
(project_config.stderr),
file=sys.stderr
)
config_error_observed = True
if auth_project_id != project_id:
print(
'User provided project ID %s does not match the configuration %s\n' %
(project_id, auth_project_id),
file=sys.stderr
)
config_error_observed = True
# Get project buckets
get_project_bucket_results = gcp.get_gcp_configuration(
gcp.Commands.GET_STORAGE_BUCKETS, human_readable=False
)
if get_project_bucket_results.has_error:
print(
'could not retrieve project buckets with error: %s' %
(get_project_bucket_results.stderr),
file=sys.stderr
)
config_error_observed = True
# Get the root of the user provided bucket i.e. gs://root.
bucket_root = '/'.join(bucket.split('/')[0:3])
print(
'Checking to see if the provided GCS bucket\n %s\nis accessible ...\n' %
(bucket)
)
if bucket_root in get_project_bucket_results.json_output:
print(
'Provided bucket \n %s\nis accessible within the project\n %s\n' %
(bucket, project_id)
)
else:
print(
'Could not find the bucket %s in project %s' % (bucket, project_id) +
'Please verify that you have provided the correct GCS bucket name.\n' +
'Only the following buckets are visible in this project:\n%s' %
(get_project_bucket_results.parsed_output),
file=sys.stderr
)
config_error_observed = True
# Verify APIs that are required are enabled
api_config_results = gcp.get_gcp_configuration(gcp.Commands.GET_APIS)
api_status = {}
if api_config_results.has_error:
print(
'could not retrieve API status with error: %s' %
(api_config_results.stderr),
file=sys.stderr
)
config_error_observed = True
print('Checking APIs status ...')
for item in api_config_results.parsed_output:
api_status[item['config']['name']] = item['state']
# printing the results in stdout for logging purposes
print('%s %s' % (item['config']['name'], item['state']))
# Check if target apis are enabled
api_check_results = True
for api in target_apis.replace(' ', '').split(','):
if 'ENABLED' != api_status.get(api, 'DISABLED'):
api_check_results = False
print(
'API \"%s\" is not accessible or not enabled. To enable this api go to '
% (api) +
'https://console.cloud.google.com/apis/library/%s?project=%s' %
(api, project_id),
file=sys.stderr
)
config_error_observed = True
if 'HALT_ON_ERROR' in execution_mode and config_error_observed:
raise RuntimeError(
'There was an error in your environment configuration.\n' +
'Note that resolving such issues generally require a deep knowledge of Kubernetes.\n'
+ '\n' +
'We highly recommend that you recreate the cluster and check "Allow access ..." \n'
+
'checkbox during cluster creation to have the cluster configured automatically.\n'
+
'For more information on this and other troubleshooting instructions refer to\n'
+ 'our troubleshooting guide.\n' + '\n' +
'If you have intentionally modified the cluster configuration, you may\n'
+
'bypass this error by removing the execution_mode HALT_ON_ERROR flag.\n'
)
return (project_id, bucket)
def _serialize_str(str_value: str) -> str:
if not isinstance(str_value, str):
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
return str_value
import json
import argparse
_parser = argparse.ArgumentParser(prog='Run diagnose me', description='Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored where any path beyond root\n is ignored.\n execution_mode:\n If set to HALT_ON_ERROR will case any error to raise an exception.\n This is intended to stop the data processing of a pipeline. Can set\n to False to only report Errors/Warnings.\n project_id:\n GCP project ID which is assumed to be the project under which\n current pod is executing.\n target_apis:\n String consisting of a comma separated list of apis to be verified.\n quota_check:\n List of entries describing how much quota is required. Each entry\n has three fields: region, metric and quota_needed. All\n string-typed.\n Raises:\n RuntimeError: If configuration is not setup properly and\n HALT_ON_ERROR flag is set.')
_parser.add_argument("--bucket", dest="bucket", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--execution-mode", dest="execution_mode", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--project-id", dest="project_id", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--target-apis", dest="target_apis", type=str, required=True, default=argparse.SUPPRESS)
_parser.add_argument("--quota-check", dest="quota_check", type=json.loads, required=False, default=argparse.SUPPRESS)
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2)
_parsed_args = vars(_parser.parse_args())
_output_files = _parsed_args.pop("_output_paths", [])
_outputs = run_diagnose_me(**_parsed_args)
if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str):
_outputs = [_outputs]
_output_serializers = [
_serialize_str,
_serialize_str,
]
import os
for idx, output_file in enumerate(_output_files):
try:
os.makedirs(os.path.dirname(output_file))
except OSError:
pass
with open(output_file, 'w') as f:
f.write(_output_serializers[idx](_outputs[idx]))
image: google/cloud-sdk:279.0.0
inputs:
parameters:
- {name: diagnostic_mode}
- {name: output}
- {name: project}
outputs:
artifacts:
- {name: run-diagnose-me-bucket, path: /tmp/outputs/bucket/data}
- {name: run-diagnose-me-project_id, path: /tmp/outputs/project_id/data}
metadata:
labels:
pipelines.kubeflow.org/kfp_sdk_version: 1.8.12
pipelines.kubeflow.org/pipeline-sdk-type: kfp
pipelines.kubeflow.org/enable_caching: "true"
annotations: {pipelines.kubeflow.org/component_spec: '{"description": "Performs
environment verification specific to this pipeline.\n\n args:\n bucket:\n string
name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored
where any path beyond root\n is ignored.\n execution_mode:\n If
set to HALT_ON_ERROR will case any error to raise an exception.\n This
is intended to stop the data processing of a pipeline. Can set\n to
False to only report Errors/Warnings.\n project_id:\n GCP
project ID which is assumed to be the project under which\n current
pod is executing.\n target_apis:\n String consisting
of a comma separated list of apis to be verified.\n quota_check:\n List
of entries describing how much quota is required. Each entry\n has
three fields: region, metric and quota_needed. All\n string-typed.\n Raises:\n RuntimeError:
If configuration is not setup properly and\n HALT_ON_ERROR flag
is set.", "implementation": {"container": {"args": ["--bucket", {"inputValue":
"bucket"}, "--execution-mode", {"inputValue": "execution_mode"}, "--project-id",
{"inputValue": "project_id"}, "--target-apis", {"inputValue": "target_apis"},
{"if": {"cond": {"isPresent": "quota_check"}, "then": ["--quota-check",
{"inputValue": "quota_check"}]}}, "----output-paths", {"outputPath": "bucket"},
{"outputPath": "project_id"}], "command": ["python3", "-u", "-c", "from
typing import NamedTuple\n\ndef run_diagnose_me(\n bucket: str,\n execution_mode:
str,\n project_id: str,\n target_apis: str,\n quota_check: list
= None,\n) -> NamedTuple(''Outputs'', [(''bucket'', str), (''project_id'',
str)]):\n \"\"\" Performs environment verification specific to this pipeline.\n\n args:\n bucket:\n string
name of the bucket to be checked. Must be of the format\n gs://bucket_root/any/path/here/is/ignored
where any path beyond root\n is ignored.\n execution_mode:\n If
set to HALT_ON_ERROR will case any error to raise an exception.\n This
is intended to stop the data processing of a pipeline. Can set\n to
False to only report Errors/Warnings.\n project_id:\n GCP
project ID which is assumed to be the project under which\n current
pod is executing.\n target_apis:\n String consisting
of a comma separated list of apis to be verified.\n quota_check:\n List
of entries describing how much quota is required. Each entry\n has
three fields: region, metric and quota_needed. All\n string-typed.\n Raises:\n RuntimeError:
If configuration is not setup properly and\n HALT_ON_ERROR flag
is set.\n \"\"\"\n\n # Installing pip3 and kfp, since the base image
''google/cloud-sdk:279.0.0''\n # does not come with pip3 pre-installed.\n import
subprocess\n subprocess.run([\n ''curl'', ''https://bootstrap.pypa.io/get-pip.py'',
''-o'', ''get-pip.py''\n ],\n capture_output=True)\n subprocess.run([''apt-get'',
''install'', ''python3-distutils'', ''--yes''],\n capture_output=True)\n subprocess.run([''python3'',
''get-pip.py''], capture_output=True)\n subprocess.run([''python3'', ''-m'',
''pip'', ''install'', ''kfp>=0.1.31'', ''--quiet''],\n capture_output=True)\n\n import
sys\n from kfp.cli.diagnose_me import gcp\n\n config_error_observed =
False\n\n quota_list = gcp.get_gcp_configuration(\n gcp.Commands.GET_QUOTAS,
human_readable=False\n )\n\n if quota_list.has_error:\n print(''Failed
to retrieve project quota with error %s\\n'' % (quota_list.stderr))\n config_error_observed
= True\n else:\n # Check quota.\n quota_dict = {} # Mapping from
region to dict[metric, available]\n for region_quota in quota_list.json_output:\n quota_dict[region_quota[''name'']]
= {}\n for quota in region_quota[''quotas'']:\n quota_dict[region_quota[''name'']][quota[''metric'']\n ]
= quota[''limit''] - quota[''usage'']\n\n quota_check = [] or quota_check\n for
single_check in quota_check:\n if single_check[''region''] not in quota_dict:\n print(\n ''Regional
quota for %s does not exist in current project.\\n'' %\n (single_check[''region''])\n )\n config_error_observed
= True\n else:\n if quota_dict[single_check[''region'']][single_check[''metric'']\n ]
< single_check[''quota_needed'']:\n print(\n ''Insufficient
quota observed for %s at %s: %s is needed but only %s is available.\\n''\n %
(\n single_check[''metric''], single_check[''region''],\n str(single_check[''quota_needed'']\n ),
str(quota_dict[single_check[''region'']][single_check[''metric'']])\n )\n )\n config_error_observed
= True\n\n # Get the project ID\n # from project configuration\n project_config
= gcp.get_gcp_configuration(\n gcp.Commands.GET_GCLOUD_DEFAULT, human_readable=False\n )\n if
not project_config.has_error:\n auth_project_id = project_config.parsed_output[''core''][''project'']\n print(\n ''GCP
credentials are configured with access to project: %s ...\\n'' %\n (project_id)\n )\n print(''Following
account(s) are active under this pipeline:\\n'')\n subprocess.run([''gcloud'',
''auth'', ''list'', ''--format'', ''json''])\n print(''\\n'')\n else:\n print(\n ''Project
configuration is not accessible with error %s\\n'' %\n (project_config.stderr),\n file=sys.stderr\n )\n config_error_observed
= True\n\n if auth_project_id != project_id:\n print(\n ''User
provided project ID %s does not match the configuration %s\\n'' %\n (project_id,
auth_project_id),\n file=sys.stderr\n )\n config_error_observed
= True\n\n # Get project buckets\n get_project_bucket_results = gcp.get_gcp_configuration(\n gcp.Commands.GET_STORAGE_BUCKETS,
human_readable=False\n )\n\n if get_project_bucket_results.has_error:\n print(\n ''could
not retrieve project buckets with error: %s'' %\n (get_project_bucket_results.stderr),\n file=sys.stderr\n )\n config_error_observed
= True\n\n # Get the root of the user provided bucket i.e. gs://root.\n bucket_root
= ''/''.join(bucket.split(''/'')[0:3])\n\n print(\n ''Checking to
see if the provided GCS bucket\\n %s\\nis accessible ...\\n'' %\n (bucket)\n )\n\n if
bucket_root in get_project_bucket_results.json_output:\n print(\n ''Provided
bucket \\n %s\\nis accessible within the project\\n %s\\n'' %\n (bucket,
project_id)\n )\n\n else:\n print(\n ''Could not find the
bucket %s in project %s'' % (bucket, project_id) +\n ''Please verify
that you have provided the correct GCS bucket name.\\n'' +\n ''Only
the following buckets are visible in this project:\\n%s'' %\n (get_project_bucket_results.parsed_output),\n file=sys.stderr\n )\n config_error_observed
= True\n\n # Verify APIs that are required are enabled\n api_config_results
= gcp.get_gcp_configuration(gcp.Commands.GET_APIS)\n\n api_status = {}\n\n if
api_config_results.has_error:\n print(\n ''could not retrieve
API status with error: %s'' %\n (api_config_results.stderr),\n file=sys.stderr\n )\n config_error_observed
= True\n\n print(''Checking APIs status ...'')\n for item in api_config_results.parsed_output:\n api_status[item[''config''][''name'']]
= item[''state'']\n # printing the results in stdout for logging purposes\n print(''%s
%s'' % (item[''config''][''name''], item[''state'']))\n\n # Check if target
apis are enabled\n api_check_results = True\n for api in target_apis.replace(''
'', '''').split('',''):\n if ''ENABLED'' != api_status.get(api, ''DISABLED''):\n api_check_results
= False\n print(\n ''API \\\"%s\\\" is not accessible or not
enabled. To enable this api go to ''\n % (api) +\n ''https://console.cloud.google.com/apis/library/%s?project=%s''
%\n (api, project_id),\n file=sys.stderr\n )\n config_error_observed
= True\n\n if ''HALT_ON_ERROR'' in execution_mode and config_error_observed:\n raise
RuntimeError(\n ''There was an error in your environment configuration.\\n''
+\n ''Note that resolving such issues generally require a deep knowledge
of Kubernetes.\\n''\n + ''\\n'' +\n ''We highly recommend
that you recreate the cluster and check \"Allow access ...\" \\n''\n +\n ''checkbox
during cluster creation to have the cluster configured automatically.\\n''\n +\n ''For
more information on this and other troubleshooting instructions refer to\\n''\n +
''our troubleshooting guide.\\n'' + ''\\n'' +\n ''If you have intentionally
modified the cluster configuration, you may\\n''\n +\n ''bypass
this error by removing the execution_mode HALT_ON_ERROR flag.\\n''\n )\n\n return
(project_id, bucket)\n\ndef _serialize_str(str_value: str) -> str:\n if
not isinstance(str_value, str):\n raise TypeError(''Value \"{}\"
has type \"{}\" instead of str.''.format(str(str_value), str(type(str_value))))\n return
str_value\n\nimport json\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Run
diagnose me'', description=''Performs environment verification specific
to this pipeline.\\n\\n args:\\n bucket:\\n string
name of the bucket to be checked. Must be of the format\\n gs://bucket_root/any/path/here/is/ignored
where any path beyond root\\n is ignored.\\n execution_mode:\\n If
set to HALT_ON_ERROR will case any error to raise an exception.\\n This
is intended to stop the data processing of a pipeline. Can set\\n to
False to only report Errors/Warnings.\\n project_id:\\n GCP
project ID which is assumed to be the project under which\\n current
pod is executing.\\n target_apis:\\n String consisting
of a comma separated list of apis to be verified.\\n quota_check:\\n List
of entries describing how much quota is required. Each entry\\n has
three fields: region, metric and quota_needed. All\\n string-typed.\\n Raises:\\n RuntimeError:
If configuration is not setup properly and\\n HALT_ON_ERROR flag
is set.'')\n_parser.add_argument(\"--bucket\", dest=\"bucket\", type=str,
required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--execution-mode\",
dest=\"execution_mode\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--project-id\",
dest=\"project_id\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--target-apis\",
dest=\"target_apis\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"--quota-check\",
dest=\"quota_check\", type=json.loads, required=False, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\",
dest=\"_output_paths\", type=str, nargs=2)\n_parsed_args = vars(_parser.parse_args())\n_output_files
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = run_diagnose_me(**_parsed_args)\n\nif
not hasattr(_outputs, ''__getitem__'') or isinstance(_outputs, str):\n _outputs
= [_outputs]\n\n_output_serializers = [\n _serialize_str,\n _serialize_str,\n\n]\n\nimport
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
"image": "google/cloud-sdk:279.0.0"}}, "inputs": [{"name": "bucket", "type":
"String"}, {"name": "execution_mode", "type": "String"}, {"name": "project_id",
"type": "String"}, {"name": "target_apis", "type": "String"}, {"name": "quota_check",
"optional": true, "type": "JsonArray"}], "name": "Run diagnose me", "outputs":
[{"name": "bucket", "type": "String"}, {"name": "project_id", "type": "String"}]}',
pipelines.kubeflow.org/component_ref: '{"digest": "433e542431d41e00c54a033b67c86d0ec5432e11113ed3704a5779fcc598d429",
"url": "https://raw.githubusercontent.com/kubeflow/pipelines/566dddfdfc0a6a725b6e50ea85e73d8d5578bbb9/components/diagnostics/diagnose_me/component.yaml"}',
pipelines.kubeflow.org/arguments.parameters: '{"bucket": "{{inputs.parameters.output}}",
"execution_mode": "{{inputs.parameters.diagnostic_mode}}", "project_id":
"{{inputs.parameters.project}}", "quota_check": "[{\"metric\": \"CPUS\",
\"quota_needed\": 12.0, \"region\": \"us-central1\"}]", "target_apis": "dataproc.googleapis.com"}'}
- name: xgboost-trainer
inputs:
parameters:
- {name: diagnostic_mode}
- {name: output}
- {name: project}
- {name: rounds}
dag:
tasks:
- name: exit-handler-1
template: exit-handler-1
dependencies: [run-diagnose-me]
arguments:
parameters:
- {name: output, value: '{{inputs.parameters.output}}'}
- {name: project, value: '{{inputs.parameters.project}}'}
- {name: rounds, value: '{{inputs.parameters.rounds}}'}
- name: run-diagnose-me
template: run-diagnose-me
arguments:
parameters:
- {name: diagnostic_mode, value: '{{inputs.parameters.diagnostic_mode}}'}
- {name: output, value: '{{inputs.parameters.output}}'}
- {name: project, value: '{{inputs.parameters.project}}'}
arguments:
parameters:
- {name: output, value: 'gs://{{kfp-default-bucket}}'}
- {name: project, value: '{{kfp-project-id}}'}
- {name: diagnostic_mode, value: HALT_ON_ERROR}
- {name: rounds, value: '5'}
serviceAccountName: pipeline-runner
onExit: dataproc-delete-cluster
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment