Skip to content

Instantly share code, notes, and snippets.

@slopp
Last active August 28, 2024 13:03
Show Gist options
  • Save slopp/ad09cb7002c4dff899e3989e5f991c8e to your computer and use it in GitHub Desktop.
Save slopp/ad09cb7002c4dff899e3989e5f991c8e to your computer and use it in GitHub Desktop.
Palmer ML Workflow with Dagster
import datetime
import pins
import os
import seaborn as sns
from dagster import asset, asset_check, AssetCheckResult
from posit import connect # install as uv pip install posit-sdk
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, StandardScaler
CONNECT_SERVER = os.getenv("CONNECT_SERVER")
CONNECT_API_KEY = os.getenv("CONNECT_API_KEY")
CONNECT_USER_NAME = "USER_NAME"
CONNECT_CONTENT_GUID = "CONTENT_GUID"
@asset
def prepared_data():
penguins = sns.load_dataset("penguins", cache=False)
penguins = penguins.dropna()
encoder = LabelEncoder()
penguins["species"] = encoder.fit_transform(penguins["species"])
return {"penguins": penguins, "encoder": encoder}
@asset
def trained_model(prepared_data):
penguins = prepared_data["penguins"]
encoder = prepared_data["encoder"]
X = penguins[
["bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"]
]
y = penguins["species"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=0
)
pipeline = Pipeline([("scaler", StandardScaler()), ("model", LogisticRegression())])
pipeline.fit(X_train, y_train)
return {"pipeline": pipeline, "encoder": encoder, "X_test": X_test, "y_test": y_test}
@asset_check(
asset=trained_model
)
def evaluate_model(trained_model):
pipeline = trained_model["pipeline"]
X_test = trained_model["X_test"]
y_test = trained_model["y_test"]
score = pipeline.score(X_test, y_test)
print(f"Model score: {score}")
return AssetCheckResult(
passed=score >=0.9
)
@asset
def deployed_model(trained_model):
print("Deploying model")
encoder = trained_model["encoder"]
pipeline = trained_model["pipeline"]
board = pins.board_connect(CONNECT_SERVER, api_key=CONNECT_API_KEY, cache=None, allow_pickle_read=True)
board.pin_write(encoder, name=f"{CONNECT_USER_NAME}/encoder", type="joblib")
board.pin_write(pipeline, name=f"{CONNECT_USER_NAME}/pipeline", type="joblib")
return
@asset(
deps=[deployed_model]
)
def the_app():
#client = connect.Client(CONNECT_SERVER, CONNECT_API_KEY)
#content = client.content.get(CONNECT_CONTENT_GUID)
#print(f"Restarting {content.dashboard_url}")
#content.restart()
...
@slopp
Copy link
Author

slopp commented Aug 21, 2024

Screen Shot 2024-08-20 at 6 23 13 PM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment