Skip to content

Instantly share code, notes, and snippets.

View tuulos's full-sized avatar

Ville Tuulos tuulos

View GitHub Profile
from metaflow import FlowSpec, step
class HelloWorldFlow(FlowSpec):
@step
def start(self):
print("This is start step")
import time
print("<<BEGIN>> 10")
for i in range(10):
@tuulos
tuulos / analyze_artifacts.py
Last active August 18, 2021 06:04
Analyze artifacts
from metaflow import Parameter, current, Flow, step, FlowSpec
from functools import wraps
class ArtifactProxy:
def __init__(self, flow):
params = {x.id for x in Flow(current.flow_name)[current.run_id]['_parameters'].task}
base = {x for x in dir(flow) if x not in params}
self.__dict__.update({
'_artifacts_flow': flow,
from metaflow import FlowSpec, step, Parameter, IncludeFile, catch
import math, time, uuid, datetime, random, string, sys
from decimal import Decimal
import requests
class CustomClass():
def __str__(self):
return 'a' * int(1024**2)
@tuulos
tuulos / magicdir.py
Created February 5, 2022 00:57
magic dir
from metaflow import FlowSpec, step
from functools import wraps
from functools import wraps
dir = 'mydir'
def magicdir(f):
artifact = 'magicdir'
@wraps(f)
from metaflow import FlowSpec, step, Parameter, resources, conda_base, profile
@conda_base(python='3.8.3', libraries={'scikit-learn': '0.24.1'})
class ManyKmeansFlow(FlowSpec):
num_docs = Parameter('num-docs', help='Number of documents', default=1000000)
@resources(memory=4000)
@step
def start(self):
import sys
def baseline(k):
n = 0
for i in range(k + 1):
n += str(i).count('1')
return n
def modulo(k):
if k == 0:
@tuulos
tuulos / config_train.py
Created February 4, 2023 00:52
Train a model with a config file using Metaflow
from metaflow import FlowSpec, step, IncludeFile
def dataset_wine():
from sklearn import datasets
return datasets.load_wine(return_X_y=True)
def model_knn(train_data, train_labels):
from sklearn.neighbors import KNeighborsClassifier
model = KNeighborsClassifier()
model.fit(train_data, train_labels)
@tuulos
tuulos / s3dir.py
Created March 10, 2023 06:43
Sync full directories to/from S3
import os
from metaflow import S3
def put_dir(local_root, s3root):
root = os.path.abspath(local_root)
objs = []
for p, _, files in os.walk(root):
for f in files:
path = os.path.join(p, f)
key = os.path.relpath(path, start=root)
import random
from metaflow import FlowSpec, step, S3, Flow, Parameter, profile, kubernetes, conda, conda_base
# change columns according to your schema (or remove column list to load all)
COLUMNS = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
# group parquet files as 1GB batches
def shard_data(src, batch_size=1_000_000_000):
with S3() as s3:
objs = s3.list_recursive([src])
@tuulos
tuulos / dump_data.py
Created May 12, 2023 18:56
export Metaflow tasks in a CSV
from metaflow import namespace, Metaflow, Run
def fmt(t):
return t.strftime('%Y-%m-%dT%H:%M:%SZ')
print('flow,run,step,task,created,finished,user,runtime,pod_id,pod_name')
namespace(None)
for flow in Metaflow():
for run in flow:
if run.successful: