Skip to content

Instantly share code, notes, and snippets.

@Geremie
Geremie / multiple_gcs_bucket_sensor.py
Last active April 3, 2022 23:57
Are you using Cloud Functions for event based processing ?
def poke(self, context):
hook = GCSHook(
gcp_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to,
impersonation_chain=self.impersonation_chain,
)
for bucket in self.buckets:
match = [{'bucket': bucket, 'object': object}
for object in hook.list(bucket)]
self._matches.extend(match)
@Geremie
Geremie / produce_small_data_using_specification.py
Created February 28, 2021 19:34
Never struggle again to share data between your Kubeflow Pipelines components
name: Deployment Checker
description: The component checks whether to deploy model new version or not
inputs:
- name: project_id
description: ''
type: String
- name: model_name
description: ''
type: String
- name: model_folder
@Geremie
Geremie / produce_large_data_using_python_function.py
Created February 28, 2021 18:26
Never struggle again to share data between your Kubeflow Pipelines components
def download_blob(source_bucket_name: str, source_blob_name: str, dest_file_path: OutputPath()):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(source_bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(dest_file_path)
get_model_location_task = GET_MODEL_LOCATION_OP(bucket_name, model_folder)
@Geremie
Geremie / consume_small_data.py
Created February 28, 2021 18:04
Never struggle again to share data between your Kubeflow Pipelines components
ml_engine_deploy_task = ML_ENGINE_DEPLOY_OP(
model_uri=get_model_location_task.outputs['location'],
project_id=project_id,
model_id=MODEL_NAME,
version_id='version_{}'.format(MODEL_FOLDER),
runtime_version=RUNTIME_VERSION,
python_version='3.5',
set_default=True,
wait_interval=WAIT_INTERVAL)
@Geremie
Geremie / produce_small_data_using_python_function.py
Created February 28, 2021 17:33
Never struggle again to share data between your Kubeflow Pipelines components
def get_model_location(bucket_name: str, model_folder: str) -> NamedTuple('returns', [('location', 'GCSPath')]):
from google.cloud import storage
folder = 'model/{}/export/exporter/'.format(model_folder)
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=folder)
blob_name = None
for blob in blobs:
tmp = len(blob.name.rstrip('/').split('/'))
@Geremie
Geremie / externally_trigger_pipeline.py
Created December 16, 2020 12:06
Automate your ML model retraining with Kubeflow
def trigger(project_id, bucket_name, train_steps, model_folder, host):
client = kfp.Client(namespace='default', host=host)
params = {
'project_id': project_id,
'bucket_name': bucket_name,
'train_steps': train_steps,
'model_folder': model_folder
}
experiments = client.list_experiments()
experiment_id = experiments.experiments[0].id
@Geremie
Geremie / conditional_retraining_trigger.py
Created December 16, 2020 11:58
Automate your ML model retraining with Kubeflow
with dsl.Condition(is_retraining_needed_task.output == 'yes'):
trigger_model_training_pipeline_task = \
trigger_model_training_pipeline_op(project_id, bucket_name, train_steps, model_folder, host)
@Geremie
Geremie / store_metrics_component.py
Created December 16, 2020 10:24
Automate your ML model retraining with Kubeflow
def store_training_job_metrics_op(project_id: str, metrics_file_path: str, model_name: str, model_folder: str):
return dsl.ContainerOp(
name='store-training-job-metrics',
image='gcr.io/{}/taxi-fare-utils:1.0'.format(project_id),
command=['python3', '-m', 'metrics_writer'],
arguments=[
'--metrics_file_path', metrics_file_path,
'--model_name', model_name,
'--model_folder', model_folder
]
@Geremie
Geremie / donwload_metrics_file_and_create_component.py
Created December 16, 2020 10:16
Automate your ML model retraining with Kubeflow
def download_blob(source_bucket_name: str, source_blob_name: str, dest_file_path: OutputPath()):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(source_bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(dest_file_path)
DOWNLOAD_BLOB_OP = comp.create_component_from_func(func=download_blob,
@Geremie
Geremie / load_and_use_component.py
Created December 16, 2020 09:55
Automate your ML model retraining with Kubeflow
ML_ENGINE_TRAIN_OP = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/1.1.1-beta.1/components/gcp/ml_engine/train/component.yaml')
ml_engine_train_task = ML_ENGINE_TRAIN_OP(
project_id=project_id,
python_module=PYTHON_MODULE,
package_uris=package_uris,
region=REGION,
args=args,
job_dir=job_dir,