Skip to content

Instantly share code, notes, and snippets.

@hussainsultan
Created May 20, 2025 15:47
Show Gist options
  • Save hussainsultan/89237d9a690e0d4b7b0917ed4e43bdbc to your computer and use it in GitHub Desktop.
Save hussainsultan/89237d9a690e0d4b7b0917ed4e43bdbc to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
import xorq as xo
from xorq.expr.udf import make_pandas_udf
import xorq.vendor.ibis.expr.datatypes as dt
def normalize_feature(df):
(column_name,) = df.columns
return (df[column_name] - df[column_name].mean()) / df[column_name].std()
schema_norm = xo.schema({"some_random": "float64"}) # the column name is not validated in the UDF execution. This happens to be convenient
normalize_feature_udf = make_pandas_udf(
normalize_feature,
schema_norm,
dt.float64,
name="normalize_feature"
)
# Define a stub ML model processing function
def run_ml_model(df: pd.DataFrame):
# The model implementation could be anything:
# - A complex ensemble model with intricate state management
# - An implementation that requires specific libraries
# - Code with concurrency requirements
# - Legacy code that's difficult to refactor
# In this simple example, we're just simulating a prediction
predictions = df['feature1'] * 0.5 + df['feature2'] * 0.3 + np.random.normal(0, 0.1, size=len(df))
return df.assign(prediction=predictions)
schema_in = xo.schema({"feature1": "float64", "feature2": "float64", "target": "float64"})
schema_out = xo.schema({"feature1": "float64", "feature2": "float64", "target": "float64", "prediction": "float64"})
ml_model_udxf = xo.expr.relations.flight_udxf(
process_df=run_ml_model,
maybe_schema_in=schema_in.to_pyarrow(),
maybe_schema_out=schema_out.to_pyarrow(),
name="MLModelPredictor"
)
def create_pipeline():
con = xo.connect()
data = pd.DataFrame({
"feature1": np.random.normal(0, 1, 100),
"feature2": np.random.normal(0, 1, 100),
"target": np.random.normal(0, 1, 100)
})
table = con.register(data, "features")
normalized = table.mutate(
feature1=normalize_feature_udf.on_expr,
feature2=normalize_feature_udf.on_expr,
)
predictions = normalized.pipe(ml_model_udxf)
return predictions
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment