Skip to content

Instantly share code, notes, and snippets.

@Steboss89
Created June 2, 2023 13:41
Show Gist options
  • Save Steboss89/cfa7a8ea6f5f0c5b9f29ad64fee8981f to your computer and use it in GitHub Desktop.
Save Steboss89/cfa7a8ea6f5f0c5b9f29ad64fee8981f to your computer and use it in GitHub Desktop.
Example of working KFP
# Pipeline to retrieve some data on staging
# trustedplatform-pl-stging.italianpromo.fake_dataset
from pathlib import Path
from typing import List
import kfp
import yaml
from trustedplatform_kfpvertex.pipelines.components import bigquery, virtual_machine
from trustedplatform_kfpvertex.pipelines.utils import compile
@kfp.dsl.pipeline(name="rf-example-1", description="Example for running a RF model")
def pipeline(
vertex_project: str,
project_region: str,
vertex_bucket: str,
cache: bool,
input_data_path: str,
instance_name: str,
container_image: str,
location: str,
container_command: str,
container_args: List[str],
repo_slug: str,
artefacts_bucket: str,
):
r"""KFP pipeline. This pipeline retrieves input data from bigquery
and calls a virtual machine to run a training job
"""
# query the dataset and export it
sql_query = (
"SELECT * FROM trustedplatform-pl-staging.fake_dataset.classification_test"
)
dataset = bigquery.read_from_bq(
sql_query=sql_query,
project_id=vertex_project,
save_results=True,
user_output_data_path=input_data_path,
output_data_format="csv",
)
instance_id = virtual_machine.train_on_vm(
project_id=vertex_project,
instance_name=instance_name,
location=location,
model_image=container_image,
artefacts_bucket=artefacts_bucket,
container_command=container_command,
container_args=container_args,
after_component=dataset.output,
)
poll_id = virtual_machine.poll_machine(
project_id=vertex_project,
instance_name=instance_name,
location=location,
repo_slug=repo_slug,
instance_id=instance_id.output,
)
delete_id = virtual_machine.delete_machine(
project_id=vertex_project,
instance_name=instance_name,
location=location,
after_component=poll_id.output,
)
config = yaml.safe_load(Path("vertex_config.yaml").read_text())
pipe = compile.Compiler(pipeline, config)
pipe.build_vertex_artifact()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment