Skip to content

Instantly share code, notes, and snippets.

@swiftdiaries
Last active March 23, 2024 14:43
Show Gist options
  • Save swiftdiaries/423e8b92fb1dcf31d8e83f4ca4c13c37 to your computer and use it in GitHub Desktop.
Save swiftdiaries/423e8b92fb1dcf31d8e83f4ca4c13c37 to your computer and use it in GitHub Desktop.
import kfp.dsl as dsl
import kfp.onprem as onprem
import kfp.gcp as gcp
from kubernetes import client as k8s_client
pvc_name = 'nfs' # pvc name should be same as one in pipeline definition
def source_cluster_op(step_name='run-onprem'):
return dsl.ContainerOp(
name = step_name,
image = 'ciscoai/run-onprem-deploy:1.0',
arguments = [
'--platform', 'onprem', # cloud or onprem
'--model-path', '/src/saved_model/',
'--server-name', '{{workflow.name}}',
'--pvc-name', pvc_name,
'--namespace', 'kubeflow'
]
)
def target_cluster_op(text1, step_name='run-on-cloud'):
return dsl.ContainerOp(
name = step_name,
image = 'ciscoai/run-on-cloud-deploy:1.0',
arguments = [
'--platform', 'cloud', # cloud or onprem
'--model-path', '/src/saved_model/',
'--gke-cluster-name', <INSERT_GKE_CLUSTER_NAME>,
'--cluster-zone', <INSERT_CLUSTER_ZONE>,
'--project', <INSERT_CLUSTER_ZONE>,
'--server-name', '{{workflow.name}}',
'--pvc-name', pvc_name,
'--namespace', 'kubeflow'
]
)
@dsl.pipeline(
name='Basic Two Step Hybrid Pipeline',
description='A pipeline that spans across two clouds.'
)
def hybrid_pipeline():
# k8s volume resources for workflow
nfs_pvc = k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name)
# claim name should be same as one in deploy step
nfs_volume = k8s_client.V1Volume(name='argo-workflow', persistent_volume_claim=nfs_pvc)
nfs_volume_mount = k8s_client.V1VolumeMount(mount_path='/mnt/', name='argo-workflow')
# Define individual steps and attach relevant volumes
source_step = source_cluster_op()
source_step.add_volume(nfs_volume)
source_step.add_volume_mount(nfs_volume_mount)
source_step.set_image_pull_policy('Always')
target_step = target_cluster_op(source_step.output)
target_step.apply(gcp.use_gcp_secret('gke-kfp-sa', '/hybrid-kubeflow-pipeline-demo.json'))
target_step.set_image_pull_policy('Always')
# Define the workflow DAG
target_step.after(source_step)
if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(hybrid_pipeline, __file__+'.tar.gz')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment