Skip to content

Instantly share code, notes, and snippets.

@myui
Created November 29, 2024 06:30
Show Gist options
  • Save myui/4d9fb081596e51463950c88e4f24cef0 to your computer and use it in GitHub Desktop.
Save myui/4d9fb081596e51463950c88e4f24cef0 to your computer and use it in GitHub Desktop.
from typing import List
import numpy as np
from numpy.typing import ArrayLike
import scipy.sparse as sp
from sklearn.linear_model import ElasticNet
import warnings
from sklearn.exceptions import ConvergenceWarning
from tqdm import tqdm
class ColumnarView:
def __init__(self, csr_matrix: sp.csr_matrix):
self.csr_matrix = csr_matrix
# create the columnar view of the matrix
ind = self.csr_matrix.copy()
ind.data = np.arange(len(ind.data)) # store the original data indices
self.col_view = ind.tocsc()
@property
def shape(self) -> tuple:
return self.csr_matrix.shape
def get_col(self, j: int) -> sp.csc_matrix:
"""
Return the j-th column of the matrix.
Parameters
----------
j : int
The column index.
Returns
-------
scipy.sparse.csc_matrix
The j-th column of the matrix.
"""
col = self.col_view[:, j].copy()
col.data = self.csr_matrix.data[col.data]
return col
def set_col(self, j: int, values: ArrayLike) -> None:
"""
Set the j-th column of the matrix to the given values.
Parameters
----------
j : int
The column index.
values : ArrayLike
The new values for the column.
"""
self.csr_matrix.data[self.col_view[:, j].data] = values
# Function to get column indexes of a specific row with type hints
def get_column_indexes(csr_matrix: sp.csr_matrix, row_index: int) -> List[int]:
"""
Get the column indexes of non-zero elements for a specific row in a CSR matrix.
Args:
csr_matrix (csr_matrix): The CSR matrix.
row_index (int): The row index to retrieve column indexes for.
Returns:
List[int]: A list of column indexes with non-zero values.
"""
start_idx = csr_matrix.indptr[row_index]
end_idx = csr_matrix.indptr[row_index + 1]
return csr_matrix.indices[start_idx:end_idx].tolist()
class SLIMElastic:
"""
SLIMElastic is a sparse linear method for top-K recommendation, which learns
a sparse aggregation coefficient matrix by solving an L1-norm and L2-norm
regularized optimization problem.
"""
def __init__(self, config: dict={}):
"""
Initialize the SLIMElastic model.
Args:
alpha (float): Regularization strength.
l1_ratio (float): The ratio between L1 and L2 regularization.
positive_only (bool): Whether to enforce positive coefficients.
"""
self.alpha = config.get("alpha", 0.01)
self.l1_ratio = config.get("l1_ratio", 0.5)
self.positive_only = config.get("positive_only", True)
self.max_iter = config.get("max_iter", 30)
self.tol = config.get("tol", 1e-4)
self.random_state = config.get("random_state", 43)
# Initialize an empty item similarity matrix (will be computed during fit)
self.item_similarity = None
def fit(self, interaction_matrix: sp.csr_matrix):
"""
Fit the SLIMElastic model to the interaction matrix.
Args:
interaction_matrix (csr_matrix): User-item interaction matrix (sparse).
"""
if not isinstance(interaction_matrix, sp.csr_matrix):
raise ValueError("Interaction matrix must be a scipy.sparse.csr_matrix of user-item interactions.")
X = ColumnarView(interaction_matrix)
num_items = X.shape[1]
self.item_similarity = np.zeros((num_items, num_items)) # Initialize similarity matrix
model = ElasticNet(
alpha=self.alpha, # Regularization strength
l1_ratio=self.l1_ratio,
positive=self.positive_only, # Enforce positive coefficients
fit_intercept=False,
copy_X=False, # Avoid copying the input matrix
precompute=True, # Precompute Gram matrix for faster computation
max_iter=self.max_iter,
tol=self.tol,
selection='random', # Randomize the order of features
random_state=self.random_state
)
# Ignore convergence warnings for ElasticNet
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ConvergenceWarning)
# Iterate through each item (column) and fit the model
for j in tqdm(range(num_items), desc="Fitting SLIMElastic"):
# Target column (current item)
y = X.get_col(j)
# Set the target item column to 0
X.set_col(j, np.zeros_like(y.data))
# Fit the model
model.fit(X.csr_matrix, y.toarray().ravel())
# Update the item similarity matrix with new coefficients (weights for each user-item interaction)
self.item_similarity[:, j] = model.coef_
# Reattach the item column after training
X.set_col(j, y.data)
return self
def partial_fit(self, interaction_matrix: sp.csr_matrix, user_ids: List[int]):
user_items = set()
for user_id in user_ids:
user_items.update(interaction_matrix[user_id].indices.tolist())
return self.partial_fit_items(interaction_matrix, list(user_items))
def partial_fit_items(self, interaction_matrix: sp.csr_matrix, updated_items: List[int]):
"""
Incrementally fit the SLIMElastic model with new or updated items.
Args:
interaction_matrix (coo_matrix): user-item interaction matrix (sparse).
updated_items (list): List of item indices that were updated.
"""
if not isinstance(interaction_matrix, sp.csr_matrix):
raise ValueError("Interaction matrix must be a scipy.sparse.csr_matrix of user-item interactions.")
X = ColumnarView(interaction_matrix)
model = ElasticNet(
alpha=self.alpha, # Regularization strength
l1_ratio=self.l1_ratio,
positive=self.positive_only, # Enforce positive coefficients
fit_intercept=False,
copy_X=False, # Avoid copying the input matrix
precompute=True, # Precompute Gram matrix for faster computation
max_iter=self.max_iter,
tol=self.tol,
selection='random', # Randomize the order of features
random_state=self.random_state
)
old_size = self.item_similarity.shape[1]
new_size = max(updated_items) + 1
if new_size > old_size:
# Expand both rows and columns symmetrically
expanded_similarity = np.zeros((new_size, new_size))
expanded_similarity[:old_size, :old_size] = self.item_similarity
self.item_similarity = expanded_similarity
# Iterate through the updated items and fit the model incrementally
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ConvergenceWarning)
for j in tqdm(updated_items):
# Target column (current item)
y = X.get_col(j)
# Set the target item column to 0
X.set_col(j, np.zeros_like(y.data))
# Fit the model for the updated item
model.fit(X.csr_matrix, y.toarray().ravel())
# Update the item similarity matrix with new coefficients (weights for each user-item interaction)
self.item_similarity[:, j] = model.coef_
# Reattach the item column after training
X.set_col(j, y.data)
return self
def predict(self, user_id: int, interaction_matrix: sp.csr_matrix):
"""
Compute the predicted scores for a specific user across all items.
Args:
user_id (int): The user ID (row index in interaction_matrix).
interaction_matrix (csr_matrix): User-item interaction matrix.
Returns:
numpy.ndarray: Predicted scores for the user across all items.
"""
if self.item_similarity is None:
raise RuntimeError("Model must be fitted before calling predict.")
# Compute the predicted scores by performing dot product between the user interaction vector
# and the item similarity matrix
return interaction_matrix[user_id].dot(self.item_similarity)
def predict_all(self, interaction_matrix: sp.csr_matrix):
"""
Compute the predicted scores for all users and items.
Args:
interaction_matrix (csr_matrix): User-item interaction matrix.
Returns:
numpy.ndarray: Predicted scores for all users and items.
"""
if self.item_similarity is None:
raise RuntimeError("Model must be fitted before calling predict_all.")
# Compute the predicted scores for all users by performing dot product between the interaction matrix
# and the item similarity matrix
return interaction_matrix.dot(self.item_similarity)
def recommend(self, user_id: int, interaction_matrix: sp.csr_matrix, top_k: int=10, exclude_seen: bool=True) -> List[int]:
"""
Recommend top-K items for a given user.
Args:
user_id (int): ID of the user (row index in interaction_matrix).
interaction_matrix (csr_matrix): User-item interaction matrix (sparse).
top_k (int): Number of recommendations to return.
exclude_seen (bool): Whether to exclude items the user has already interacted with.
Returns:
List of recommended item indices.
"""
# Get predicted scores for all items for the given user
user_scores = self.predict(user_id, interaction_matrix).ravel()
# Exclude items that the user has already interacted with
if exclude_seen:
seen_items = interaction_matrix[user_id].indices
user_scores[seen_items] = -np.inf # Exclude seen items by setting scores to -inf
# Get the top-K items by sorting the predicted scores in descending order
# [::-1] reverses the order to get the items with the highest scores first
top_items = np.argsort(user_scores)[-top_k:][::-1]
return top_items
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment