Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jeanmidevacc/b09572f8eeea178bc80131a7763904c5 to your computer and use it in GitHub Desktop.
Save jeanmidevacc/b09572f8eeea178bc80131a7763904c5 to your computer and use it in GitHub Desktop.
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window
def build_recommendations(broadcast_dfp_items_factors):
def build_recommendations_(user_factors, inventory_itemid_encoded, k=5):
# Fetch on the factors for the item that can be recommended , add your rules
dfp_items_factors_to_rank = broadcast_dfp_items_factors.value[~broadcast_array_items_factors.value["id"].isin(inventory_itemid_encoded)]
# Predict the rating and sort the item
dfp_items_factors_to_rank["predicted_rating"] = dfp_items_factors_to_rank["features"].apply(lambda features: np.dot(features, user_factors)
dfp_items_factors_to_rank.sort_values("predicted_rating", ascending=False, inplace=True)
return dfp_items_factors_to_rank.head(k)["id"].tolist()# return only the top k items
return F.udf(build_recommendations_, T.ArrayType(T.StringType())).asNondeterministic()
# You are in spark setup with an als model fit
dfp_items_factors = model.itemFactors.select(["id", "features"]).toPandas()
# Build the dataframe to store the recommendations
dfs_recommendations = model.userFactors.join(dfs_inventory_itemid_encoded, on=["id"], how="left")# Add to the user factors the historic of itemid encoded that should not be recommended
# Build recommendations with the ALS broadcast items factors
dfs_recommendations = dfs_recommendations.withColumn("recommendations_raw", build_recommendations(spark.sparkContext.broadcast(dfp_items_factors))(F.col("features"), F.col("inventory_itemid_encoded")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment