Created
May 20, 2025 15:47
-
-
Save hussainsultan/89237d9a690e0d4b7b0917ed4e43bdbc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
import xorq as xo | |
from xorq.expr.udf import make_pandas_udf | |
import xorq.vendor.ibis.expr.datatypes as dt | |
def normalize_feature(df): | |
(column_name,) = df.columns | |
return (df[column_name] - df[column_name].mean()) / df[column_name].std() | |
schema_norm = xo.schema({"some_random": "float64"}) # the column name is not validated in the UDF execution. This happens to be convenient | |
normalize_feature_udf = make_pandas_udf( | |
normalize_feature, | |
schema_norm, | |
dt.float64, | |
name="normalize_feature" | |
) | |
# Define a stub ML model processing function | |
def run_ml_model(df: pd.DataFrame): | |
# The model implementation could be anything: | |
# - A complex ensemble model with intricate state management | |
# - An implementation that requires specific libraries | |
# - Code with concurrency requirements | |
# - Legacy code that's difficult to refactor | |
# In this simple example, we're just simulating a prediction | |
predictions = df['feature1'] * 0.5 + df['feature2'] * 0.3 + np.random.normal(0, 0.1, size=len(df)) | |
return df.assign(prediction=predictions) | |
schema_in = xo.schema({"feature1": "float64", "feature2": "float64", "target": "float64"}) | |
schema_out = xo.schema({"feature1": "float64", "feature2": "float64", "target": "float64", "prediction": "float64"}) | |
ml_model_udxf = xo.expr.relations.flight_udxf( | |
process_df=run_ml_model, | |
maybe_schema_in=schema_in.to_pyarrow(), | |
maybe_schema_out=schema_out.to_pyarrow(), | |
name="MLModelPredictor" | |
) | |
def create_pipeline(): | |
con = xo.connect() | |
data = pd.DataFrame({ | |
"feature1": np.random.normal(0, 1, 100), | |
"feature2": np.random.normal(0, 1, 100), | |
"target": np.random.normal(0, 1, 100) | |
}) | |
table = con.register(data, "features") | |
normalized = table.mutate( | |
feature1=normalize_feature_udf.on_expr, | |
feature2=normalize_feature_udf.on_expr, | |
) | |
predictions = normalized.pipe(ml_model_udxf) | |
return predictions |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment