Created
June 2, 2023 13:41
-
-
Save Steboss89/cfa7a8ea6f5f0c5b9f29ad64fee8981f to your computer and use it in GitHub Desktop.
Example of working KFP
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pipeline to retrieve some data on staging | |
# trustedplatform-pl-stging.italianpromo.fake_dataset | |
from pathlib import Path | |
from typing import List | |
import kfp | |
import yaml | |
from trustedplatform_kfpvertex.pipelines.components import bigquery, virtual_machine | |
from trustedplatform_kfpvertex.pipelines.utils import compile | |
@kfp.dsl.pipeline(name="rf-example-1", description="Example for running a RF model") | |
def pipeline( | |
vertex_project: str, | |
project_region: str, | |
vertex_bucket: str, | |
cache: bool, | |
input_data_path: str, | |
instance_name: str, | |
container_image: str, | |
location: str, | |
container_command: str, | |
container_args: List[str], | |
repo_slug: str, | |
artefacts_bucket: str, | |
): | |
r"""KFP pipeline. This pipeline retrieves input data from bigquery | |
and calls a virtual machine to run a training job | |
""" | |
# query the dataset and export it | |
sql_query = ( | |
"SELECT * FROM trustedplatform-pl-staging.fake_dataset.classification_test" | |
) | |
dataset = bigquery.read_from_bq( | |
sql_query=sql_query, | |
project_id=vertex_project, | |
save_results=True, | |
user_output_data_path=input_data_path, | |
output_data_format="csv", | |
) | |
instance_id = virtual_machine.train_on_vm( | |
project_id=vertex_project, | |
instance_name=instance_name, | |
location=location, | |
model_image=container_image, | |
artefacts_bucket=artefacts_bucket, | |
container_command=container_command, | |
container_args=container_args, | |
after_component=dataset.output, | |
) | |
poll_id = virtual_machine.poll_machine( | |
project_id=vertex_project, | |
instance_name=instance_name, | |
location=location, | |
repo_slug=repo_slug, | |
instance_id=instance_id.output, | |
) | |
delete_id = virtual_machine.delete_machine( | |
project_id=vertex_project, | |
instance_name=instance_name, | |
location=location, | |
after_component=poll_id.output, | |
) | |
config = yaml.safe_load(Path("vertex_config.yaml").read_text()) | |
pipe = compile.Compiler(pipeline, config) | |
pipe.build_vertex_artifact() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment