daniilkogan

Untitled

Nov 16th, 2025
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 18.45 KB | None | 0 0
  1. from typing import Tuple, Iterator
  2. from abc import ABC, abstractmethod
  3.  
  4.  
  5. import numpy as np
  6. from scipy.sparse import csr_matrix
  7. from joblib import Parallel, delayed
  8. import catboost
  9. from scipy.sparse import coo_matrix
  10. import pandas as pd
  11. import scipy
  12. from tqdm.auto import tqdm
  13.  
  14.  
  15. class SparseExplicitALS:
  16.     """
  17.    Memory-efficient explicit ALS for ratings data
  18.    Minimizes: Σ (r_ui - u_i^T v_i)^2 + λ(||U||^2 + ||V||^2)
  19.    Only over observed ratings (keeps sparse throughout)
  20.    """
  21.  
  22.     def __init__(self, n_factors=50, n_iterations=10, reg_lambda=0.01, n_jobs=-1):
  23.         """
  24.        Args:
  25.            n_factors: Number of latent factors
  26.            n_iterations: Number of ALS iterations
  27.            reg_lambda: L2 regularization parameter
  28.            n_jobs: Number of parallel jobs (-1 = all cores)
  29.        """
  30.         self.n_factors = n_factors
  31.         self.n_iterations = n_iterations
  32.         self.reg_lambda = reg_lambda
  33.         self.n_jobs = n_jobs
  34.         self.user_factors = None
  35.         self.item_factors = None
  36.  
  37.     def fit(self, ratings_sparse):
  38.         """
  39.        Train on sparse ratings matrix WITHOUT densifying
  40.  
  41.        Args:
  42.            ratings_sparse: scipy.sparse matrix (n_users x n_items)
  43.                          Contains ratings only for observed entries
  44.        """
  45.         # Ensure CSR format for efficient row access
  46.         ratings_csr = csr_matrix(ratings_sparse)
  47.         n_users, n_items = ratings_csr.shape
  48.  
  49.         # Also create CSC format for efficient column access (items)
  50.         ratings_csc = ratings_csr.tocsc()
  51.  
  52.         # Initialize factors with small random values
  53.         self.user_factors = np.random.normal(0, 0.1, (n_users, self.n_factors))
  54.         self.item_factors = np.random.normal(0, 0.1, (n_items, self.n_factors))
  55.  
  56.         # Training loop
  57.         for iteration in range(self.n_iterations):
  58.             # Fix items, update users (process rows)
  59.             self.user_factors = self._update_factors_parallel(
  60.                 fixed_factors=self.item_factors,
  61.                 ratings_sparse=ratings_csr
  62.             )
  63.  
  64.             # Fix users, update items (process columns via transposed CSR)
  65.             self.item_factors = self._update_factors_parallel(
  66.                 fixed_factors=self.user_factors,
  67.                 ratings_sparse=ratings_csc.T.tocsr()
  68.             )
  69.  
  70.             # Compute and print loss
  71.             if iteration % 2 == 0:
  72.                 loss = self._compute_loss_sparse(ratings_csr)
  73.                 print(f"Iteration {iteration}: Loss = {loss:.4f}")
  74.  
  75.         return self
  76.  
  77.     def _update_single_entity(self, entity_idx, fixed_factors, ratings_sparse, reg_eye, YTY):
  78.         """
  79.        Update factors for a single user/item
  80.  
  81.        For user u with rated items I_u:
  82.        u = (V_{I_u}^T V_{I_u} + λI)^-1 V_{I_u}^T r_u
  83.  
  84.        where V_{I_u} are item factors for items user rated
  85.        """
  86.         # Get sparse ratings for this entity
  87.         start_idx = ratings_sparse.indptr[entity_idx]
  88.         end_idx = ratings_sparse.indptr[entity_idx + 1]
  89.  
  90.         # No ratings - return zero vector or keep random initialization
  91.         if start_idx == end_idx:
  92.             return np.zeros(self.n_factors)
  93.  
  94.         # Indices and values of rated items (NEVER densify!)
  95.         rated_indices = ratings_sparse.indices[start_idx:end_idx]
  96.         rating_values = ratings_sparse.data[start_idx:end_idx]
  97.  
  98.         # Get factor vectors for rated items only
  99.         V_rated = fixed_factors[rated_indices]  # Shape: (n_rated, n_factors)
  100.  
  101.         # Solve: (V^T V + λI) x = V^T r
  102.         # where V contains only rows for rated items
  103.         A = V_rated.T @ V_rated + reg_eye
  104.         b = V_rated.T @ rating_values
  105.  
  106.         return np.linalg.solve(A, b)
  107.  
  108.     def _update_factors_parallel(self, fixed_factors, ratings_sparse):
  109.         """
  110.        Update all user/item factors in parallel
  111.  
  112.        Args:
  113.            fixed_factors: The factors being held constant (items when updating users)
  114.            ratings_sparse: Sparse ratings in CSR format
  115.        """
  116.         n_entities = ratings_sparse.shape[0]
  117.         n_factors = fixed_factors.shape[1]
  118.  
  119.         # Regularization term (precompute once)
  120.         reg_eye = self.reg_lambda * np.eye(n_factors)
  121.  
  122.         # Precompute Y^T Y (not used in explicit, but kept for potential optimization)
  123.         YTY = fixed_factors.T @ fixed_factors
  124.  
  125.         # Parallel computation across all entities
  126.         new_factors = Parallel(n_jobs=self.n_jobs, backend='threading')(
  127.             delayed(self._update_single_entity)(
  128.                 entity_idx, fixed_factors, ratings_sparse, reg_eye, YTY
  129.             )
  130.             for entity_idx in range(n_entities)
  131.         )
  132.  
  133.         return np.array(new_factors)
  134.  
  135.     def _compute_loss_sparse(self, ratings_sparse):
  136.         """
  137.        Compute training loss WITHOUT densifying
  138.  
  139.        Loss = Σ (r_ui - u_i^T v_i)^2 + λ(||U||^2 + ||V||^2)
  140.        Only over observed ratings
  141.        """
  142.         squared_error = 0.0
  143.  
  144.         # Iterate over non-zero entries only
  145.         for u in range(ratings_sparse.shape[0]):
  146.             start_idx = ratings_sparse.indptr[u]
  147.             end_idx = ratings_sparse.indptr[u + 1]
  148.  
  149.             if start_idx == end_idx:
  150.                 continue
  151.  
  152.             item_indices = ratings_sparse.indices[start_idx:end_idx]
  153.             true_ratings = ratings_sparse.data[start_idx:end_idx]
  154.  
  155.             # Predict ratings for this user's rated items
  156.             predicted = self.user_factors[u] @ self.item_factors[item_indices].T
  157.  
  158.             # Squared error
  159.             squared_error += np.sum((true_ratings - predicted) ** 2)
  160.  
  161.         # Regularization
  162.         reg_loss = self.reg_lambda * (
  163.                 np.sum(self.user_factors ** 2) +
  164.                 np.sum(self.item_factors ** 2)
  165.         )
  166.  
  167.         return squared_error + reg_loss
  168.  
  169.     def predict(self, user_id=None, item_ids=None):
  170.         """
  171.        Predict ratings for user(s) and item(s)
  172.  
  173.        Args:
  174.            user_id: User index or array of user indices
  175.            item_ids: Item indices (if None, predicts all items)
  176.  
  177.        Returns:
  178.            Predicted rating(s)
  179.        """
  180.         if item_ids is None:
  181.             if user_id is not None:
  182.                 # Predict all items for user
  183.                 return self.user_factors[user_id] @ self.item_factors.T
  184.             else:
  185.                 return self.user_factors @ self.item_factors.T
  186.         else:
  187.             # Predict specific items
  188.             if np.isscalar(user_id) and np.isscalar(item_ids):
  189.                 return self.user_factors[user_id] @ self.item_factors[item_ids]
  190.             elif np.isscalar(user_id):
  191.                 return self.user_factors[user_id] @ self.item_factors[item_ids].T
  192.             else:
  193.                 # Multiple users and items
  194.                 return np.sum(
  195.                     self.user_factors[user_id] * self.item_factors[item_ids],
  196.                     axis=1
  197.                 )
  198.  
  199.     def recommend(self, user_id, known_items, k=10):
  200.         """
  201.        Get top-k recommendations for a user
  202.  
  203.        Args:
  204.            user_id: User index
  205.            known_items: Indices of items user has already rated
  206.            k: Number of recommendations
  207.        """
  208.         # Predict scores for all items
  209.         scores = self.predict(user_id)
  210.  
  211.         # Mask already rated items
  212.         scores[known_items] = -np.inf
  213.  
  214.         # Efficient top-k using argpartition
  215.         top_k_indices = np.argpartition(scores, -k)[-k:]
  216.         top_k_indices = top_k_indices[np.argsort(scores[top_k_indices])[::-1]]
  217.  
  218.         return top_k_indices, scores[top_k_indices]
  219.  
  220.     def evaluate(self, test_ratings_sparse):
  221.         """
  222.        Evaluate RMSE on test set (stays sparse)
  223.  
  224.        Args:
  225.            test_ratings_sparse: Sparse test ratings matrix
  226.        """
  227.         squared_errors = []
  228.  
  229.         test_csr = csr_matrix(test_ratings_sparse)
  230.  
  231.         for u in range(test_csr.shape[0]):
  232.             start_idx = test_csr.indptr[u]
  233.             end_idx = test_csr.indptr[u + 1]
  234.  
  235.             if start_idx == end_idx:
  236.                 continue
  237.  
  238.             item_indices = test_csr.indices[start_idx:end_idx]
  239.             true_ratings = test_csr.data[start_idx:end_idx]
  240.  
  241.             # Predict
  242.             predicted = self.predict(u, item_indices)
  243.  
  244.             squared_errors.extend((true_ratings - predicted) ** 2)
  245.  
  246.         rmse = np.sqrt(np.mean(squared_errors))
  247.         return rmse
  248.  
  249.  
  250. class SparseExplicitALSOptimized(SparseExplicitALS):
  251.     """
  252.    Further optimized version with better numerical stability
  253.    and Cholesky decomposition for faster solving
  254.    """
  255.  
  256.     def _update_single_entity(self, entity_idx, fixed_factors, ratings_sparse, reg_eye, YTY):
  257.         """
  258.        Optimized update using Cholesky decomposition
  259.        """
  260.         start_idx = ratings_sparse.indptr[entity_idx]
  261.         end_idx = ratings_sparse.indptr[entity_idx + 1]
  262.  
  263.         if start_idx == end_idx:
  264.             return np.zeros(self.n_factors)
  265.  
  266.         rated_indices = ratings_sparse.indices[start_idx:end_idx]
  267.         rating_values = ratings_sparse.data[start_idx:end_idx]
  268.  
  269.         V_rated = fixed_factors[rated_indices]
  270.  
  271.         # Build system: A x = b
  272.         A = V_rated.T @ V_rated + reg_eye
  273.         b = V_rated.T @ rating_values
  274.  
  275.         # Use Cholesky for symmetric positive definite system (faster)
  276.         try:
  277.             L = np.linalg.cholesky(A)
  278.             y = np.linalg.solve(L, b)
  279.             x = np.linalg.solve(L.T, y)
  280.             return x
  281.         except np.linalg.LinAlgError:
  282.             # Fall back to regular solve if not positive definite
  283.             return np.linalg.solve(A, b)
  284.  
  285.  
  286. class EASE:
  287.  
  288.     def __init__(self, reg_lambda: float = 500.0) -> None:
  289.         self.reg_lambda = reg_lambda
  290.         self.b_matrix: coo_matrix | None = None
  291.  
  292.     def fit(self, user_item: coo_matrix) -> 'EASE':
  293.         # n_users = user_item.shape[0]
  294.         n_items = user_item.shape[1]
  295.         gram_matrix = user_item.transpose() @ user_item
  296.  
  297.         diag_indices = np.diag_indices(n_items)
  298.         gram_matrix[diag_indices] += self.reg_lambda
  299.         p_matrix = scipy.sparse.linalg.inv(gram_matrix)
  300.  
  301.         b_matrix = p_matrix / (-scipy.sparse.csr_matrix.diagonal(p_matrix))
  302.         b_matrix = b_matrix.tolil()
  303.         b_matrix[diag_indices] = 0
  304.         self.b_matrix = b_matrix.tocoo()
  305.         return self
  306.  
  307.     def predict(self, user_item: coo_matrix):
  308.         scores = user_item @ self.b_matrix
  309.         scores[user_item.nonzero()] = -np.inf
  310.         return scores
  311.  
  312.     def recommend(self, user_id: int, user_item: coo_matrix, k: int = 10):
  313.         scores = self.predict(user_item)
  314.         user_scores = scores[user_id]
  315.  
  316.         topk_items = np.argsort(user_scores)[::-1][:k]
  317.         topk_scores = user_scores[topk_items]
  318.         return topk_items, topk_scores
  319.  
  320.  
  321. def read_data(
  322.     recommendations_path: str = '~/Downloads/recommendations.csv',
  323.     games_path: str = '~/Downloads/games.csv',
  324.     users_path: str = '~/Downloads/users.csv',
  325. ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
  326.     recommendations = (
  327.         pd.read_csv(recommendations_path)
  328.         .query("'2014-01-01' < date < '2015-01-01'")
  329.     )
  330.     app_id = set(recommendations['app_id'].unique())
  331.     games = (
  332.         pd.read_csv(games_path)
  333.         .loc[lambda x: x['app_id'].isin(app_id)]
  334.         .reset_index(drop=True)
  335.         .reset_index()
  336.         .rename(columns={'app_id': 'old_app_id'})
  337.         .rename(columns={'index': 'app_id'})
  338.         .set_index('old_app_id')
  339.     )
  340.     games_idx = games['app_id'].to_dict()
  341.  
  342.     user_id = set(recommendations['user_id'].unique())
  343.     users = (
  344.         pd.read_csv(users_path)
  345.         .loc[lambda x: x['user_id'].isin(user_id)]
  346.         .reset_index(drop=True)
  347.         .reset_index()
  348.         .rename(columns={'user_id': 'old_user_id'})
  349.         .rename(columns={'index': 'user_id'})
  350.         .set_index('old_user_id')
  351.     )
  352.     users_idx = users['user_id'].to_dict()
  353.     recommendations = (
  354.         recommendations
  355.         .assign(
  356.             date=lambda x: pd.to_datetime(x['date']),
  357.             app_id=lambda x: x['app_id'].map(games_idx),
  358.             user_id=lambda x: x['user_id'].map(users_idx)
  359.         )
  360.     )
  361.  
  362.     # recommendations['label'] = 0
  363.     # recommendations.loc[recommendations['hours_played'] > 0, 'label'] = 1
  364.     # recommendations.loc[recommendations['hours_played'] > 2, 'label'] = 2
  365.     # recommendations.loc[recommendations['is_recommended'] == True, 'label'] = 3
  366.     return recommendations, games, users
  367.  
  368.  
  369.  
  370. def read_movielens(
  371.     users_path: str = '~/Downloads/ml-10M100K/users.dat',
  372.     ratings_path: str = '~/Downloads/ml-10M100K/ratings.dat',
  373.     movies_path: str = '~/Downloads/ml-10M100K/movies.dat',
  374. ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
  375.     user_cols = ['user_id', 'gender', 'age', 'occupation', 'zip']
  376.     rating_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
  377.     movie_cols = ['movie_id', 'title', 'genres']
  378.  
  379.     # Read the users.dat file
  380.     users = pd.read_csv(users_path, sep='::', header=None, names=user_cols, engine='python')
  381.  
  382.     # Read the ratings.dat file
  383.     ratings = pd.read_csv(ratings_path, sep='::', header=None, names=rating_cols, engine='python')
  384.  
  385.     # Read the movies.dat file
  386.     movies = pd.read_csv(movies_path, sep='::', header=None, names=movie_cols, engine='python')
  387.     return users, ratings, movies
  388.  
  389.  
  390. # def recall_at_k(cands: Dict[int, Dict[int, int]], target: Dict[int, Set[int]], k: int = 10) -> float:
  391. #     """
  392. #
  393. #     :param cands: Dict[user_id, Dict[app_id, score]]
  394. #     :param target: Dict[user_id, Set[app_id]]
  395. #     :param k: metric threshold
  396. #     :return: recall at k
  397. #     """
  398. #     recalls_num = 0.0
  399. #     recalls_denom = 0
  400. #     for user_id, app_scores in tqdm(cands.items()):
  401. #         app_ids, _ = zip(*sorted(app_scores.items(), key=lambda x: -x[1])[:k])
  402. #         app_ids = set(app_ids)
  403. #         intersect = target[user_id] & app_ids
  404. #         recalls_num += len(intersect) / len(target[user_id])
  405. #         recalls_denom += 1
  406. #     return recalls_num / recalls_denom
  407.  
  408.  
  409. # def recall_at_k(cands: dict[int, list[int]], target: dict[int, dict[int, float]], k: int = 10) -> float:
  410. #     """
  411. #
  412. #     :param cands: Dict[user_id, List[app_id]]
  413. #     :param target: Dict[user_id, Dict[app_id, relevance]]
  414. #     :param k: metric threshold
  415. #     :return: recall at k
  416. #     """
  417. #     recalls_num = 0.0
  418. #     recalls_denom = 0
  419. #     for user_id, app_ids in tqdm(cands.items()):
  420. #         app_ids = set(app_ids[:k])
  421. #         target_apps = set(target[user_id])
  422. #         intersect = target_apps & app_ids
  423. #         recalls_num += len(intersect) / len(target_apps)
  424. #         recalls_denom += 1
  425. #     return recalls_num / recalls_denom
  426.  
  427.  
  428. def recall_at_k(cands: dict[int, list[int]], target: dict[int, set[int]], k: int = 10) -> float:
  429.     """
  430.  
  431.    :param cands: Dict[user_id, List[app_id]]
  432.    :param target: Dict[user_id, Set[app_id]]
  433.    :param k: metric threshold
  434.    :return: recall at k
  435.    """
  436.     recalls_num = 0.0
  437.     recalls_denom = 0
  438.     for user_id, app_ids in tqdm(cands.items()):
  439.         app_ids = set(app_ids[:k])
  440.         target_apps = target[user_id]
  441.         intersect = target_apps & app_ids
  442.         recalls_num += len(intersect) / len(target_apps)
  443.         recalls_denom += 1
  444.     return recalls_num / recalls_denom
  445.  
  446.  
  447. class CandGen(ABC):
  448.  
  449.     @abstractmethod
  450.     def get(self, user_id: int) -> Iterator[int]:
  451.         raise NotImplementedError
  452.  
  453.     @abstractmethod
  454.     def has(self, user_id: int) -> bool:
  455.         raise NotImplementedError
  456.  
  457.  
  458. class DataCandGen(CandGen):
  459.  
  460.     def __init__(self, cands: dict[int, list[int]]) -> None:
  461.         self.cands = cands
  462.  
  463.     def get(self, user_id: int) -> Iterator[int]:
  464.         yield from self.cands.get(user_id, [])
  465.  
  466.     def has(self, user_id: int) -> bool:
  467.         return user_id in self.cands
  468.  
  469.  
  470. class ConstantCandGen(CandGen):
  471.  
  472.     def __init__(self, items: list[int]) -> None:
  473.         self.items = items
  474.  
  475.     def get(self, user_id: int) -> Iterator[int]:
  476.         yield from self.items
  477.  
  478.     def has(self, user_id: int) -> bool:
  479.         return True
  480.  
  481.  
  482. class FallbackCandGen(CandGen):
  483.  
  484.     def __init__(self, *generators: CandGen) -> None:
  485.         self.generators = list(generators)
  486.  
  487.     def get(self, user_id: int) -> Iterator[int]:
  488.         for gen in self.generators:
  489.             if gen.has(user_id):
  490.                 yield from gen.get(user_id)
  491.                 break
  492.  
  493.     def has(self, user_id: int) -> bool:
  494.         return any(gen.has(user_id) for gen in self.generators)
  495.  
  496.  
  497. def generate_log(cand_gen: CandGen, users: list[int]) -> pd.DataFrame:
  498.     return pd.DataFrame.from_records(
  499.         [
  500.             {'user_id': user_id, 'app_id': app_id}
  501.             for user_id in users
  502.             for app_id in cand_gen.get(user_id)
  503.         ]
  504.     )
  505.  
  506.  
  507. if __name__ == '__main__':
  508.     recommendations, games, users = read_data()
  509.     train = recommendations.query("date < '2014-11-01'")
  510.     test = recommendations.query("date >= '2014-11-01'")
  511.     train_sparse = coo_matrix((train['is_recommended'].astype(float), (train['user_id'], train['app_id']))).tocsr()
  512.     test_sparse = coo_matrix((test['is_recommended'].astype(float), (test['user_id'], test['app_id']))).tocsr()
  513.  
  514.     topk = 50
  515.     most_popular = train[train['is_recommended']]['app_id'].value_counts(ascending=False)[:topk].index.tolist()
  516.  
  517.     from implicit.als import AlternatingLeastSquares
  518.     import os
  519.     os.environ['MKL_NUM_THREADS'] = "1"
  520.  
  521.     als = AlternatingLeastSquares(factors=40)
  522.     als.fit(train_sparse)
  523.     als_recommendations = als.recommend_all(train_sparse)
  524.  
  525.     test_users = test['user_id'][test['is_recommended']].unique().tolist()
  526.     ground_truth = test[test['is_recommended']].groupby('user_id').agg({'app_id': lambda x: set(x)})['app_id'].to_dict()
  527.     preds = {}
  528.  
  529.     # max_user = als_recommendations.shape[0] - 1
  530.     train_users = set(train['user_id'].unique())
  531.  
  532.     for user in ground_truth.keys():
  533.         if user in train_users:
  534.             preds[user] = list(als_recommendations[user])
  535.             # preds[user] = most_popular
  536.         else:
  537.             preds[user] = most_popular
  538.  
  539.     recall_at_k(
  540.         cands=preds,
  541.         target=ground_truth,
  542.         k=topk
  543.     )
  544.  
  545.     cand_gen = FallbackCandGen(
  546.         DataCandGen(cands={user_id: list(als_recommendations[user_id]) for user_id in train_users}),
  547.         ConstantCandGen(items=most_popular),
  548.     )
  549.     ranker_train_log = generate_log(cand_gen, list(train_users))
  550.  
Advertisement
Add Comment
Please, Sign In to add comment