Skip to content

Instantly share code, notes, and snippets.

@Ben-Epstein
Ben-Epstein / prefect-slack-notify.md
Last active March 24, 2025 17:39
Prefect slack notifications on flow-run failure

Slack Notifications through Prefect

It's pretty easy to get going with Prefect. But when jobs fail, there are no native notifications that run to let you know.

This is the easiest way to get going with a slack notification system for failed and crashed flow-runs (or any other state you care about).

Set up a Slack bot

  1. Go to https://api.slack.com/apps
  2. Create a new app -> From a Manifest -> Update the name "Prefect Workflow Failures" (or whatever you want)

My personal setup preference (MacOS)

echo "syntax on\nset nu\n" >> ~/.vimrc
sh -c "$(curl -fsSL https://raw.githubusercontent.com/ohmyzsh/ohmyzsh/master/tools/install.sh)"
git clone https://github.com/zsh-users/zsh-autosuggestions.git $ZSH_CUSTOM/plugins/zsh-autosuggestions
git clone https://github.com/zsh-users/zsh-syntax-highlighting.git $ZSH_CUSTOM/plugins/zsh-syntax-highlighting
sed -i '' "s/plugins=(git)/plugins=(git zsh-autosuggestions zsh-syntax-highlighting)/g" ~/.zshrc
git clone --depth=1 https://github.com/romkatv/powerlevel10k.git ${ZSH_CUSTOM:-$HOME/.oh-my-zsh/custom}/themes/powerlevel10k
sed -i '' "s/robbyrussell/powerlevel10k\/powerlevel10k/g" ~/.zshrc
@Ben-Epstein
Ben-Epstein / api.json
Created March 21, 2021 17:48
Feature Store Swagger api.json
{"openapi":"3.0.2","info":{"title":"Feature Store API","description":"API for asynchronous and synchronous calls to the feature store","version":"0.1.0"},"paths":{"/health":{"get":{"tags":["Mgmt"],"summary":"Health Check","description":"Health check","operationId":"healthcheck","responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"title":"Response Health Check Health Get","type":"string"}}}}}}},"/feature-sets":{"get":{"tags":["Feature Sets"],"summary":"Get Feature Sets","description":"Returns a list of available feature sets","operationId":"get_feature_sets","parameters":[{"required":false,"schema":{"title":"Name","type":"array","items":{"type":"string"},"default":[]},"name":"name","in":"query"}],"responses":{"200":{"description":"Successful Response","content":{"application/json":{"schema":{"title":"Response Get Feature Sets Feature Sets Get","type":"array","items":{"$ref":"#/components/schemas/FeatureSet"}}}}},"422":{"description":"Validation Error","content":{"ap
@Ben-Epstein
Ben-Epstein / get_training_set.py
Created February 11, 2021 14:38
getting a training set from a feature store
df = fs.get_training_set(
features = [
'max_spending_category_30d',
'max_purchase_amount_15d',
'spending_avg_7_day',
'spending_avg_15_day',
'spending_avg_60_day',
'customer_churn',
]
)
spark.stop()
mlflow.end_run()
%%sql
insert into iris_model (sepal_length, sepal_width, petal_length, petal_width, moment_id) values (5.1, 3.5, 1.4, 0.2, 0);
insert into iris_model (sepal_length, sepal_width, petal_length, petal_width, moment_id) values (6.4, 2.7, 5.3, 2.0, 1);
select * from iris_model;
schema = splice.getConnection().getSchema()
run_id = mlflow.current_run_id()
splice.dropTableIfExists(schema, 'iris_model')
jid = mlflow.deploy_db(schema, 'iris_model', run_id, create_model_table=True, df=df.select(cols[:-1]), primary_key={'MOMENT_ID':'INT'}, classes=list(data.target_names))
mlflow.watch_job(jid)
mlflow.log_pipeline_stages(model)
mlflow.log_feature_transformations(model)
mlflow.log_param('maxDepth', model.stages[-1].getOrDefault('maxDepth'))
mlflow.log_param('maxBins', model.stages[-1].getOrDefault('maxBins'))
mlflow.log_model(model, 'spark_dt') # Important!
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
va = VectorAssembler(inputCols=cols[:-1], outputCol='features') # Define feature vector
dt = DecisionTreeClassifier() # Define model
pipeline = Pipeline(stages=[va, dt]) # Chain steps together into a full pipeline
train, test = df.randomSplit([0.8,0.2])
model = pipeline.fit(train)
@Ben-Epstein
Ben-Epstein / Load Iris Data into Spark.py
Created November 5, 2020 15:26
Load Iris Data into Spark
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np
data = load_iris()
cols = [i.replace('(cm)','').strip().replace(' ','_') for i in data.feature_names] + ['label'] # Column name cleanup
pdf = pd.DataFrame(np.c_[data.data, data.target], columns=cols)
df = spark.createDataFrame(pdf)
df.show()