Created
December 23, 2022 13:32
-
-
Save MatthewRalston/e6ef762be10751e806abee90b5df7565 to your computer and use it in GitHub Desktop.
A simple kubeflow pipeline step to determine if a file needs decompression
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python | |
import os | |
import sys | |
import argparse | |
import kfp.dsl as dsl | |
import kfp.components as comp | |
import kfp | |
import logging | |
global logger | |
logger = None | |
def get_root_logger(level): | |
levels=[logging.WARNING, logging.INFO, logging.DEBUG] | |
if level < 0 or level > 2: | |
raise TypeError("{0}.get_root_logger expects a verbosity between 0-2".format(__file__)) | |
logging.basicConfig(level=levels[level], format="%(levelname)s: %(asctime)s %(funcName)s L%(lineno)s| %(message)s", datefmt="%Y/%m/%d %I:%M:%S") | |
root_logger = logging.getLogger() | |
return root_logger | |
components_dir = os.path.join(os.path.dirname(__file__), "components") | |
#gunzip = comp.load_component_from_file(os.path.join(components_dir, "gunzip.yaml")) | |
@dsl.component | |
def is_unzip_needed(infile:str) -> bool: | |
if type(infile) is not str: | |
raise TypeError("pipeline.is_unzip_needed() expects a str as its only positional argument") | |
elif infile.endswith(".gz"): | |
return True | |
else: | |
return False | |
@dsl.container_component | |
def gunzip(infile:dsl.InputPath(str), outfile:dsl.OutputPath(str)): | |
""" Infile like 'path/to/example.txt.gz', outfile like 'path/to/example.txt' """ | |
return dsl.ContainerSpec( | |
image='bitnami/minideb:latest', | |
command=[ | |
'gunzip ', | |
'$1 ', | |
'|| ', | |
'mv $1 $2' | |
], | |
args=[infile, outfile] | |
) | |
@dsl.pipeline( | |
name='kubeflow-barebone-demo', | |
description='kubeflow demo with minimal setup' | |
) | |
def rnaseq_pipeline(fastq1:str): | |
# Step 1: training component | |
unzip_is_needed = is_unzip_needed(infile=fastq1) | |
if unzip_is_needed: | |
unzipped_fastq1 = str(fastq1).rstrip(".gz") | |
gunzip1 = gunzip(infile=fastq1) | |
#gunzip1 = gunzip(infile=fastq1, outfile) | |
# with dsl.Condition(str(infile).endswith(".gz"), 'fastq1-needs-gunzip'): | |
# gunzip1 = gunzip(infile=infile) | |
# print("GUNZIPPED") | |
# print(gunzip1.output) | |
# with dsl.Condition(".gz" not in str(fastq1)): | |
# print("NOT GUNZIPPED: {0}".format(str(fastq1))) | |
# # Step 2: evaluation component | |
# evaluation = dsl.ContainerOp( | |
# name='evaluation', | |
# image='ubuntu:latest', | |
# command=[ | |
# 'sh', '-c', | |
# 'echo "evaluating" && ' | |
# f'echo \'load model from: {training.outputs["model_save_path"]}\' && ' | |
# 'sleep 10 && ' | |
# 'echo "accuracy: 0.$(shuf -i 0-99 -n 1)" > /accuracy.txt' | |
# ], | |
# file_outputs={'accuracy': '/accuracy.txt'} | |
# ) | |
if __name__ == "__main__": | |
logger = get_root_logger(2) | |
kfp.compiler.Compiler().compile(rnaseq_pipeline, 'pipeline.yaml') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment