Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from typing import Tuple, Iterator
- from abc import ABC, abstractmethod
- import numpy as np
- from scipy.sparse import csr_matrix
- from joblib import Parallel, delayed
- import catboost
- from scipy.sparse import coo_matrix
- import pandas as pd
- import scipy
- from tqdm.auto import tqdm
- class SparseExplicitALS:
- """
- Memory-efficient explicit ALS for ratings data
- Minimizes: Σ (r_ui - u_i^T v_i)^2 + λ(||U||^2 + ||V||^2)
- Only over observed ratings (keeps sparse throughout)
- """
- def __init__(self, n_factors=50, n_iterations=10, reg_lambda=0.01, n_jobs=-1):
- """
- Args:
- n_factors: Number of latent factors
- n_iterations: Number of ALS iterations
- reg_lambda: L2 regularization parameter
- n_jobs: Number of parallel jobs (-1 = all cores)
- """
- self.n_factors = n_factors
- self.n_iterations = n_iterations
- self.reg_lambda = reg_lambda
- self.n_jobs = n_jobs
- self.user_factors = None
- self.item_factors = None
- def fit(self, ratings_sparse):
- """
- Train on sparse ratings matrix WITHOUT densifying
- Args:
- ratings_sparse: scipy.sparse matrix (n_users x n_items)
- Contains ratings only for observed entries
- """
- # Ensure CSR format for efficient row access
- ratings_csr = csr_matrix(ratings_sparse)
- n_users, n_items = ratings_csr.shape
- # Also create CSC format for efficient column access (items)
- ratings_csc = ratings_csr.tocsc()
- # Initialize factors with small random values
- self.user_factors = np.random.normal(0, 0.1, (n_users, self.n_factors))
- self.item_factors = np.random.normal(0, 0.1, (n_items, self.n_factors))
- # Training loop
- for iteration in range(self.n_iterations):
- # Fix items, update users (process rows)
- self.user_factors = self._update_factors_parallel(
- fixed_factors=self.item_factors,
- ratings_sparse=ratings_csr
- )
- # Fix users, update items (process columns via transposed CSR)
- self.item_factors = self._update_factors_parallel(
- fixed_factors=self.user_factors,
- ratings_sparse=ratings_csc.T.tocsr()
- )
- # Compute and print loss
- if iteration % 2 == 0:
- loss = self._compute_loss_sparse(ratings_csr)
- print(f"Iteration {iteration}: Loss = {loss:.4f}")
- return self
- def _update_single_entity(self, entity_idx, fixed_factors, ratings_sparse, reg_eye, YTY):
- """
- Update factors for a single user/item
- For user u with rated items I_u:
- u = (V_{I_u}^T V_{I_u} + λI)^-1 V_{I_u}^T r_u
- where V_{I_u} are item factors for items user rated
- """
- # Get sparse ratings for this entity
- start_idx = ratings_sparse.indptr[entity_idx]
- end_idx = ratings_sparse.indptr[entity_idx + 1]
- # No ratings - return zero vector or keep random initialization
- if start_idx == end_idx:
- return np.zeros(self.n_factors)
- # Indices and values of rated items (NEVER densify!)
- rated_indices = ratings_sparse.indices[start_idx:end_idx]
- rating_values = ratings_sparse.data[start_idx:end_idx]
- # Get factor vectors for rated items only
- V_rated = fixed_factors[rated_indices] # Shape: (n_rated, n_factors)
- # Solve: (V^T V + λI) x = V^T r
- # where V contains only rows for rated items
- A = V_rated.T @ V_rated + reg_eye
- b = V_rated.T @ rating_values
- return np.linalg.solve(A, b)
- def _update_factors_parallel(self, fixed_factors, ratings_sparse):
- """
- Update all user/item factors in parallel
- Args:
- fixed_factors: The factors being held constant (items when updating users)
- ratings_sparse: Sparse ratings in CSR format
- """
- n_entities = ratings_sparse.shape[0]
- n_factors = fixed_factors.shape[1]
- # Regularization term (precompute once)
- reg_eye = self.reg_lambda * np.eye(n_factors)
- # Precompute Y^T Y (not used in explicit, but kept for potential optimization)
- YTY = fixed_factors.T @ fixed_factors
- # Parallel computation across all entities
- new_factors = Parallel(n_jobs=self.n_jobs, backend='threading')(
- delayed(self._update_single_entity)(
- entity_idx, fixed_factors, ratings_sparse, reg_eye, YTY
- )
- for entity_idx in range(n_entities)
- )
- return np.array(new_factors)
- def _compute_loss_sparse(self, ratings_sparse):
- """
- Compute training loss WITHOUT densifying
- Loss = Σ (r_ui - u_i^T v_i)^2 + λ(||U||^2 + ||V||^2)
- Only over observed ratings
- """
- squared_error = 0.0
- # Iterate over non-zero entries only
- for u in range(ratings_sparse.shape[0]):
- start_idx = ratings_sparse.indptr[u]
- end_idx = ratings_sparse.indptr[u + 1]
- if start_idx == end_idx:
- continue
- item_indices = ratings_sparse.indices[start_idx:end_idx]
- true_ratings = ratings_sparse.data[start_idx:end_idx]
- # Predict ratings for this user's rated items
- predicted = self.user_factors[u] @ self.item_factors[item_indices].T
- # Squared error
- squared_error += np.sum((true_ratings - predicted) ** 2)
- # Regularization
- reg_loss = self.reg_lambda * (
- np.sum(self.user_factors ** 2) +
- np.sum(self.item_factors ** 2)
- )
- return squared_error + reg_loss
- def predict(self, user_id=None, item_ids=None):
- """
- Predict ratings for user(s) and item(s)
- Args:
- user_id: User index or array of user indices
- item_ids: Item indices (if None, predicts all items)
- Returns:
- Predicted rating(s)
- """
- if item_ids is None:
- if user_id is not None:
- # Predict all items for user
- return self.user_factors[user_id] @ self.item_factors.T
- else:
- return self.user_factors @ self.item_factors.T
- else:
- # Predict specific items
- if np.isscalar(user_id) and np.isscalar(item_ids):
- return self.user_factors[user_id] @ self.item_factors[item_ids]
- elif np.isscalar(user_id):
- return self.user_factors[user_id] @ self.item_factors[item_ids].T
- else:
- # Multiple users and items
- return np.sum(
- self.user_factors[user_id] * self.item_factors[item_ids],
- axis=1
- )
- def recommend(self, user_id, known_items, k=10):
- """
- Get top-k recommendations for a user
- Args:
- user_id: User index
- known_items: Indices of items user has already rated
- k: Number of recommendations
- """
- # Predict scores for all items
- scores = self.predict(user_id)
- # Mask already rated items
- scores[known_items] = -np.inf
- # Efficient top-k using argpartition
- top_k_indices = np.argpartition(scores, -k)[-k:]
- top_k_indices = top_k_indices[np.argsort(scores[top_k_indices])[::-1]]
- return top_k_indices, scores[top_k_indices]
- def evaluate(self, test_ratings_sparse):
- """
- Evaluate RMSE on test set (stays sparse)
- Args:
- test_ratings_sparse: Sparse test ratings matrix
- """
- squared_errors = []
- test_csr = csr_matrix(test_ratings_sparse)
- for u in range(test_csr.shape[0]):
- start_idx = test_csr.indptr[u]
- end_idx = test_csr.indptr[u + 1]
- if start_idx == end_idx:
- continue
- item_indices = test_csr.indices[start_idx:end_idx]
- true_ratings = test_csr.data[start_idx:end_idx]
- # Predict
- predicted = self.predict(u, item_indices)
- squared_errors.extend((true_ratings - predicted) ** 2)
- rmse = np.sqrt(np.mean(squared_errors))
- return rmse
- class SparseExplicitALSOptimized(SparseExplicitALS):
- """
- Further optimized version with better numerical stability
- and Cholesky decomposition for faster solving
- """
- def _update_single_entity(self, entity_idx, fixed_factors, ratings_sparse, reg_eye, YTY):
- """
- Optimized update using Cholesky decomposition
- """
- start_idx = ratings_sparse.indptr[entity_idx]
- end_idx = ratings_sparse.indptr[entity_idx + 1]
- if start_idx == end_idx:
- return np.zeros(self.n_factors)
- rated_indices = ratings_sparse.indices[start_idx:end_idx]
- rating_values = ratings_sparse.data[start_idx:end_idx]
- V_rated = fixed_factors[rated_indices]
- # Build system: A x = b
- A = V_rated.T @ V_rated + reg_eye
- b = V_rated.T @ rating_values
- # Use Cholesky for symmetric positive definite system (faster)
- try:
- L = np.linalg.cholesky(A)
- y = np.linalg.solve(L, b)
- x = np.linalg.solve(L.T, y)
- return x
- except np.linalg.LinAlgError:
- # Fall back to regular solve if not positive definite
- return np.linalg.solve(A, b)
- class EASE:
- def __init__(self, reg_lambda: float = 500.0) -> None:
- self.reg_lambda = reg_lambda
- self.b_matrix: coo_matrix | None = None
- def fit(self, user_item: coo_matrix) -> 'EASE':
- # n_users = user_item.shape[0]
- n_items = user_item.shape[1]
- gram_matrix = user_item.transpose() @ user_item
- diag_indices = np.diag_indices(n_items)
- gram_matrix[diag_indices] += self.reg_lambda
- p_matrix = scipy.sparse.linalg.inv(gram_matrix)
- b_matrix = p_matrix / (-scipy.sparse.csr_matrix.diagonal(p_matrix))
- b_matrix = b_matrix.tolil()
- b_matrix[diag_indices] = 0
- self.b_matrix = b_matrix.tocoo()
- return self
- def predict(self, user_item: coo_matrix):
- scores = user_item @ self.b_matrix
- scores[user_item.nonzero()] = -np.inf
- return scores
- def recommend(self, user_id: int, user_item: coo_matrix, k: int = 10):
- scores = self.predict(user_item)
- user_scores = scores[user_id]
- topk_items = np.argsort(user_scores)[::-1][:k]
- topk_scores = user_scores[topk_items]
- return topk_items, topk_scores
- def read_data(
- recommendations_path: str = '~/Downloads/recommendations.csv',
- games_path: str = '~/Downloads/games.csv',
- users_path: str = '~/Downloads/users.csv',
- ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
- recommendations = (
- pd.read_csv(recommendations_path)
- .query("'2014-01-01' < date < '2015-01-01'")
- )
- app_id = set(recommendations['app_id'].unique())
- games = (
- pd.read_csv(games_path)
- .loc[lambda x: x['app_id'].isin(app_id)]
- .reset_index(drop=True)
- .reset_index()
- .rename(columns={'app_id': 'old_app_id'})
- .rename(columns={'index': 'app_id'})
- .set_index('old_app_id')
- )
- games_idx = games['app_id'].to_dict()
- user_id = set(recommendations['user_id'].unique())
- users = (
- pd.read_csv(users_path)
- .loc[lambda x: x['user_id'].isin(user_id)]
- .reset_index(drop=True)
- .reset_index()
- .rename(columns={'user_id': 'old_user_id'})
- .rename(columns={'index': 'user_id'})
- .set_index('old_user_id')
- )
- users_idx = users['user_id'].to_dict()
- recommendations = (
- recommendations
- .assign(
- date=lambda x: pd.to_datetime(x['date']),
- app_id=lambda x: x['app_id'].map(games_idx),
- user_id=lambda x: x['user_id'].map(users_idx)
- )
- )
- # recommendations['label'] = 0
- # recommendations.loc[recommendations['hours_played'] > 0, 'label'] = 1
- # recommendations.loc[recommendations['hours_played'] > 2, 'label'] = 2
- # recommendations.loc[recommendations['is_recommended'] == True, 'label'] = 3
- return recommendations, games, users
- def read_movielens(
- users_path: str = '~/Downloads/ml-10M100K/users.dat',
- ratings_path: str = '~/Downloads/ml-10M100K/ratings.dat',
- movies_path: str = '~/Downloads/ml-10M100K/movies.dat',
- ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
- user_cols = ['user_id', 'gender', 'age', 'occupation', 'zip']
- rating_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
- movie_cols = ['movie_id', 'title', 'genres']
- # Read the users.dat file
- users = pd.read_csv(users_path, sep='::', header=None, names=user_cols, engine='python')
- # Read the ratings.dat file
- ratings = pd.read_csv(ratings_path, sep='::', header=None, names=rating_cols, engine='python')
- # Read the movies.dat file
- movies = pd.read_csv(movies_path, sep='::', header=None, names=movie_cols, engine='python')
- return users, ratings, movies
- # def recall_at_k(cands: Dict[int, Dict[int, int]], target: Dict[int, Set[int]], k: int = 10) -> float:
- # """
- #
- # :param cands: Dict[user_id, Dict[app_id, score]]
- # :param target: Dict[user_id, Set[app_id]]
- # :param k: metric threshold
- # :return: recall at k
- # """
- # recalls_num = 0.0
- # recalls_denom = 0
- # for user_id, app_scores in tqdm(cands.items()):
- # app_ids, _ = zip(*sorted(app_scores.items(), key=lambda x: -x[1])[:k])
- # app_ids = set(app_ids)
- # intersect = target[user_id] & app_ids
- # recalls_num += len(intersect) / len(target[user_id])
- # recalls_denom += 1
- # return recalls_num / recalls_denom
- # def recall_at_k(cands: dict[int, list[int]], target: dict[int, dict[int, float]], k: int = 10) -> float:
- # """
- #
- # :param cands: Dict[user_id, List[app_id]]
- # :param target: Dict[user_id, Dict[app_id, relevance]]
- # :param k: metric threshold
- # :return: recall at k
- # """
- # recalls_num = 0.0
- # recalls_denom = 0
- # for user_id, app_ids in tqdm(cands.items()):
- # app_ids = set(app_ids[:k])
- # target_apps = set(target[user_id])
- # intersect = target_apps & app_ids
- # recalls_num += len(intersect) / len(target_apps)
- # recalls_denom += 1
- # return recalls_num / recalls_denom
- def recall_at_k(cands: dict[int, list[int]], target: dict[int, set[int]], k: int = 10) -> float:
- """
- :param cands: Dict[user_id, List[app_id]]
- :param target: Dict[user_id, Set[app_id]]
- :param k: metric threshold
- :return: recall at k
- """
- recalls_num = 0.0
- recalls_denom = 0
- for user_id, app_ids in tqdm(cands.items()):
- app_ids = set(app_ids[:k])
- target_apps = target[user_id]
- intersect = target_apps & app_ids
- recalls_num += len(intersect) / len(target_apps)
- recalls_denom += 1
- return recalls_num / recalls_denom
- class CandGen(ABC):
- @abstractmethod
- def get(self, user_id: int) -> Iterator[int]:
- raise NotImplementedError
- @abstractmethod
- def has(self, user_id: int) -> bool:
- raise NotImplementedError
- class DataCandGen(CandGen):
- def __init__(self, cands: dict[int, list[int]]) -> None:
- self.cands = cands
- def get(self, user_id: int) -> Iterator[int]:
- yield from self.cands.get(user_id, [])
- def has(self, user_id: int) -> bool:
- return user_id in self.cands
- class ConstantCandGen(CandGen):
- def __init__(self, items: list[int]) -> None:
- self.items = items
- def get(self, user_id: int) -> Iterator[int]:
- yield from self.items
- def has(self, user_id: int) -> bool:
- return True
- class FallbackCandGen(CandGen):
- def __init__(self, *generators: CandGen) -> None:
- self.generators = list(generators)
- def get(self, user_id: int) -> Iterator[int]:
- for gen in self.generators:
- if gen.has(user_id):
- yield from gen.get(user_id)
- break
- def has(self, user_id: int) -> bool:
- return any(gen.has(user_id) for gen in self.generators)
- def generate_log(cand_gen: CandGen, users: list[int]) -> pd.DataFrame:
- return pd.DataFrame.from_records(
- [
- {'user_id': user_id, 'app_id': app_id}
- for user_id in users
- for app_id in cand_gen.get(user_id)
- ]
- )
- if __name__ == '__main__':
- recommendations, games, users = read_data()
- train = recommendations.query("date < '2014-11-01'")
- test = recommendations.query("date >= '2014-11-01'")
- train_sparse = coo_matrix((train['is_recommended'].astype(float), (train['user_id'], train['app_id']))).tocsr()
- test_sparse = coo_matrix((test['is_recommended'].astype(float), (test['user_id'], test['app_id']))).tocsr()
- topk = 50
- most_popular = train[train['is_recommended']]['app_id'].value_counts(ascending=False)[:topk].index.tolist()
- from implicit.als import AlternatingLeastSquares
- import os
- os.environ['MKL_NUM_THREADS'] = "1"
- als = AlternatingLeastSquares(factors=40)
- als.fit(train_sparse)
- als_recommendations = als.recommend_all(train_sparse)
- test_users = test['user_id'][test['is_recommended']].unique().tolist()
- ground_truth = test[test['is_recommended']].groupby('user_id').agg({'app_id': lambda x: set(x)})['app_id'].to_dict()
- preds = {}
- # max_user = als_recommendations.shape[0] - 1
- train_users = set(train['user_id'].unique())
- for user in ground_truth.keys():
- if user in train_users:
- preds[user] = list(als_recommendations[user])
- # preds[user] = most_popular
- else:
- preds[user] = most_popular
- recall_at_k(
- cands=preds,
- target=ground_truth,
- k=topk
- )
- cand_gen = FallbackCandGen(
- DataCandGen(cands={user_id: list(als_recommendations[user_id]) for user_id in train_users}),
- ConstantCandGen(items=most_popular),
- )
- ranker_train_log = generate_log(cand_gen, list(train_users))
Advertisement
Add Comment
Please, Sign In to add comment