Source code for qolmat.imputations.rpca.rpca_noisy

"""Script for the noisy RPCA."""

from __future__ import annotations

import warnings
from typing import Dict, List, Optional, Tuple

import numpy as np
import scipy as scp
from numpy.typing import NDArray
from scipy.sparse import dok_matrix, identity
from scipy.sparse.linalg import spsolve
from sklearn import utils as sku
from tqdm import tqdm

from qolmat.imputations.rpca import rpca_utils
from qolmat.imputations.rpca.rpca import RPCA
from qolmat.utils import utils
from qolmat.utils.utils import RandomSetting


[docs]class RpcaNoisy(RPCA): """Class for a noisy version of the so-called 'improved RPCA'. References ---------- Wang, Xuehui, et al. "An improved robust principal component analysis model for anomalies detection of subway passenger flow." Journal of advanced transportation (2018). Chen, Yuxin, et al. "Bridging convex and nonconvex optimization in robust PCA: Noise, outliers and missing data." The Annals of Statistics 49.5 (2021): 2948-2971. Parameters ---------- random_state : int, optional The seed of the pseudo random number generator to use, for reproducibility. rank: Optional[int] Upper bound of the rank to be estimated mu: Optional[float] initial stiffness parameter for the constraint M = L Q tau: Optional[float] penalizing parameter for the nuclear norm lam: Optional[float] penalizing parameter for the sparse matrix list_periods: Optional[List[int]] list of periods, linked to the Toeplitz matrices list_etas: Optional[List[float]] list of penalizing parameters for the corresponding period in list_periods max_iterations: Optional[int] stopping criteria, maximum number of iterations. By default, the value is set to 10_000 tolerance: Optional[float] stopping criteria, minimum difference between 2 consecutive iterations. By default, the value is set to 1e-6 norm: Optional[str] error norm, can be "L1" or "L2". By default, the value is set to "L2" verbose: Optional[bool] verbosity level, if False the warnings are silenced """
[docs] def __init__( self, random_state: RandomSetting = None, rank: Optional[int] = None, mu: Optional[float] = None, tau: Optional[float] = None, lam: Optional[float] = None, list_periods: List[int] = [], list_etas: List[float] = [], max_iterations: int = int(1e4), tolerance: float = 1e-6, norm: str = "L2", verbose: bool = True, ) -> None: super().__init__(max_iterations=max_iterations, tolerance=tolerance, verbose=verbose) self.rng = sku.check_random_state(random_state) self.rank = rank self.mu = mu self.tau = tau self.lam = lam self.list_periods = list_periods self.list_etas = list_etas self.norm = norm
[docs] def get_params_scale(self, D: NDArray) -> Dict[str, float]: """Get parameters for scaling in RPCA based on the input data. Parameters ---------- D : np.ndarray Input data matrix of shape (m, n). Returns ------- dict A dictionary containing the following parameters: - "rank" : int Rank estimate for low-rank matrix decomposition. - "tau" : float Regularization parameter for the temporal correlations. - "lam" : float Regularization parameter for the L1 norm. """ rank = rpca_utils.approx_rank(D) tau = 1.0 / np.sqrt(max(D.shape)) lam = tau return { "rank": rank, "tau": tau, "lam": lam, }
[docs] def decompose(self, D: NDArray, Omega: NDArray) -> Tuple[NDArray, NDArray]: """Compute the noisy RPCA with L1 or L2 time penalisation. Parameters ---------- D : NDArray Matrix of the observations Omega: NDArray Matrix of missingness, with boolean data Returns ------- M: NDArray Low-rank signal A: NDArray Anomalies """ M, A, _, _ = self.decompose_with_basis(D, Omega) return M, A
[docs] def decompose_with_basis( self, D: NDArray, Omega: NDArray ) -> Tuple[NDArray, NDArray, NDArray, NDArray]: """Compute the noisy RPCA with L1 or L2 time penalisation. It returns the decomposition of the low-rank matrix. Parameters ---------- D : NDArray Matrix of the observations Omega: NDArray Matrix of missingness, with boolean data Returns ------- M: NDArray Low-rank signal A: NDArray Anomalies L: NDArray Coefficients of the low-rank matrix in the reduced basis Q: NDArray Reduced basis of the low-rank matrix """ D = utils.linear_interpolation(D) self.params_scale = self.get_params_scale(D) if self.lam is not None: self.params_scale["lam"] = self.lam if self.rank is not None: self.params_scale["rank"] = self.rank if self.tau is not None: self.params_scale["tau"] = self.tau lam = self.params_scale["lam"] rank = int(self.params_scale["rank"]) tau = self.params_scale["tau"] mu = 1e-2 if self.mu is None else self.mu n_rows, n_cols = D.shape for period in self.list_periods: if not period < n_rows: raise ValueError( "The periods provided in argument in `list_periods` " "must smaller than the number of rows " f"in the matrix but {period} >= {n_rows}!" ) M, A, L, Q = self.minimise_loss( D, Omega, rank, tau, lam, mu, self.list_periods, self.list_etas, max_iterations=self.max_iterations, tolerance=self.tolerance, norm=self.norm, verbose=self.verbose, ) self._check_cost_function_minimized(D, M, A, Omega, tau, lam) return M, A, L, Q
[docs] @staticmethod def minimise_loss( D: NDArray, Omega: NDArray, rank: int, tau: float, lam: float, mu: float = 1e-2, list_periods: List[int] = [], list_etas: List[float] = [], max_iterations: int = 10000, tolerance: float = 1e-6, norm: str = "L2", verbose: bool = False, ) -> Tuple: """Compute the noisy RPCA with a L2 time penalisation. This function computes the noisy Robust Principal Component Analysis (RPCA) using a L2 time penalisation. It iteratively minimizes a loss function to separate the low-rank and sparse components from the input data matrix. Parameters ---------- D : np.ndarray Observations matrix of shape (m, n). Omega : np.ndarray Binary matrix indicating the observed entries of D, shape (m, n). rank : int Estimated low-rank of the matrix D. tau : float Penalizing parameter for the nuclear norm. lam : float Penalizing parameter for the sparse matrix. mu : float, optional Initial stiffness parameter for the constraint on M, L, and Q. Defaults to 1e-2. list_periods : List[int], optional List of periods linked to the Toeplitz matrices. Defaults to []. list_etas : List[float], optional List of penalizing parameters for the corresponding periods in list_periods. Defaults to []. max_iterations : int, optional Stopping criteria, maximum number of iterations. Defaults to 10000. tolerance : float, optional Stopping criteria, minimum difference between 2 consecutive iterations. Defaults to 1e-6. norm : str, optional Error norm, can be "L1" or "L2". Defaults to "L2". verbose : bool, optional Verbosity level, if False the warnings are silenced. Defaults to False. Returns ------- Tuple A tuple containing the following elements: - M : np.ndarray Low-rank signal matrix of shape (m, n). - A : np.ndarray Anomalies matrix of shape (m, n). - L : np.ndarray Basis unitary array of shape (m, rank). - Q : np.ndarray Basis unitary array of shape (rank, n). Raises ------ ValueError If the periods provided in the argument in `list_periods` are not smaller than the number of rows in the matrix. """ rho = 1.1 n_rows, n_cols = D.shape # init Y = np.zeros((n_rows, n_cols)) M = D.copy() A: NDArray = np.zeros((n_rows, n_cols)) U, S, Vt = np.linalg.svd(M, full_matrices=False) U = U[:, :rank] S = S[:rank] Vt = Vt[:rank, :] L = U @ np.diag(np.sqrt(S)) Q = np.diag(np.sqrt(S)) @ Vt if norm == "L1": R: list[NDArray] = [np.ones((n_rows, n_cols)) for _ in list_periods] mu_bar = mu * 1e3 # matrices for temporal correlation list_H = [rpca_utils.toeplitz_matrix(period, n_rows) for period in list_periods] HtH = dok_matrix((n_rows, n_rows)) for i_period, _ in enumerate(list_periods): HtH += list_etas[i_period] * (list_H[i_period].T @ list_H[i_period]) Ir = np.eye(rank) In = identity(n_rows) with tqdm( total=max_iterations, desc="Noisy RPCA loss minimization", unit="iteration", disable=not verbose, ) as pbar: for _ in range(max_iterations): M_temp = M.copy() A_temp = A.copy() L_temp = L.copy() Q_temp = Q.copy() if norm == "L1": R_temp = R.copy() sums = np.zeros((n_rows, n_cols)) for i_period, _ in enumerate(list_periods): sums += mu * R[i_period] - list_H[i_period] @ Y M = spsolve( (1 + mu) * In + HtH, D - A + mu * L @ Q - Y + sums, ) else: M = spsolve( (1 + mu) * In + 2 * HtH, D - A + mu * L @ Q - Y, ) M = M.reshape(D.shape) A_Omega = rpca_utils.soft_thresholding(D - M, lam) A_Omega_C = D - M A = np.where(Omega, A_Omega, A_Omega_C) Q = scp.linalg.solve( a=tau * Ir + mu * (L.T @ L), b=L.T @ (mu * M + Y), ) L = scp.linalg.solve( a=tau * Ir + mu * (Q @ Q.T), b=Q @ (mu * M.T + Y.T), ).T Y += mu * (M - L @ Q) if norm == "L1": for i_period, _ in enumerate(list_periods): eta = list_etas[i_period] R[i_period] = rpca_utils.soft_thresholding(R[i_period] / mu, eta / mu) mu = min(mu * rho, mu_bar) Mc = np.linalg.norm(M - M_temp, np.inf) Ac = np.linalg.norm(A - A_temp, np.inf) Lc = np.linalg.norm(L - L_temp, np.inf) Qc = np.linalg.norm(Q - Q_temp, np.inf) error_max = max([Mc, Ac, Lc, Qc]) # type: ignore # noqa if norm == "L1": for i_period, _ in enumerate(list_periods): Rc = np.linalg.norm(R[i_period] - R_temp[i_period], np.inf) error_max = max(error_max, Rc) # type: ignore # noqa if error_max < tolerance: break pbar.set_postfix(error=f"{error_max.item():.4f}") pbar.update(1) M = L @ Q M = M return M, A, L, Q
[docs] def decompose_on_basis( self, D: NDArray, Omega: NDArray, Q: NDArray, ) -> Tuple[NDArray, NDArray]: """Decompose the matrix D with an observation matrix Omega. It uses the noisy RPCA algorithm, with a fixed reduced basis given by the matrix Q. This allows to impute new data without resolving the optimization problem on the whole dataset. Parameters ---------- D : NDArray _description_ Omega : NDArray _description_ Q : NDArray _description_ Returns ------- Tuple[NDArray, NDArray] A tuple representing the decomposition of D with: - M: low-rank matrix - A: sparse matrix """ D = utils.linear_interpolation(D) params_scale = self.get_params_scale(D) lam = params_scale["lam"] if self.lam is None else self.lam rank = params_scale["rank"] if self.rank is None else self.rank rank = int(rank) tau = params_scale["tau"] if self.tau is None else self.tau tolerance = self.tolerance n_rows, n_cols = D.shape if n_rows == 1 or n_cols == 1: return D, np.full_like(D, 0) # M, A, L, Q = self.decompose_rpca(D, Omega) n_rank, _ = Q.shape Ir = np.eye(n_rank) A: NDArray = np.zeros((n_rows, n_cols)) L = np.zeros((n_rows, n_rank)) for _ in range(self.max_iterations): A_prev = A.copy() L_prev = L.copy() L = scp.linalg.solve( a=2 * tau * Ir + (Q @ Q.T), b=Q @ (D - A).T, ).T A_Omega = rpca_utils.soft_thresholding(D - L @ Q, lam) A_Omega_C = D - L @ Q A = np.where(Omega, A_Omega, A_Omega_C) Ac = np.linalg.norm(A - A_prev, np.inf) Lc = np.linalg.norm(L - L_prev, np.inf) tolerance = max([Ac, Lc]) # type: ignore # noqa if tolerance < tolerance: break M = L @ Q return M, A
def _check_cost_function_minimized( self, D: NDArray, M: NDArray, A: NDArray, Omega: NDArray, tau: float, lam: float, ): """Check cost function. The functional minimized by the RPCA is smaller at the end than at the beginning. Parameters ---------- D : NDArray observations matrix with first linear interpolation M : NDArray low_rank matrix resulting from RPCA A : NDArray sparse matrix resulting from RPCA Omega: NDArrau boolean matrix indicating the observed values tau : float parameter penalizing the nuclear norm of the low rank part lam : float parameter penalizing the L1-norm of the anomaly/sparse part """ cost_start = self.cost_function( D, D, np.full_like(D, 0), Omega, tau, lam, self.list_periods, self.list_etas, norm=self.norm, ) cost_end = self.cost_function( D, M, A, Omega, tau, lam, self.list_periods, self.list_etas, norm=self.norm, ) function_str = "1/2 ||D-M-A||_2 + tau ||D||_* + lam ||A||_1" if len(self.list_etas) > 0: for eta in self.list_etas: function_str += f"{eta} ||MH||_{self.norm}" if self.verbose and (cost_end > cost_start * (1 + 1e-6)): warnings.warn( "RPCA algorithm may provide bad results. " f"Function {function_str} increased from" f" {cost_start} to {cost_end} instead of decreasing!".format("%.2f") )
[docs] @staticmethod def cost_function( D: NDArray, M: NDArray, A: NDArray, Omega: NDArray, tau: float, lam: float, list_periods: List[int] = [], list_etas: List[float] = [], norm: str = "L2", ): """Estimate cost function for the noisy RPCA algorithm. Parameters ---------- D : NDArray Matrix of observations M : NDArray Low-rank signal A : NDArray Anomalies Omega : NDArray Mask for observations tau: Optional[float] penalizing parameter for the nuclear norm lam: Optional[float] penalizing parameter for the sparse matrix list_periods: Optional[List[int]] list of periods, linked to the Toeplitz matrices list_etas: Optional[List[float]] list of penalizing parameters for the corresponding period in list_periods norm: Optional[str] error norm, can be "L1" or "L2". By default, the value is set to "L2" Returns ------- float Value of the cost function minimized by the RPCA """ temporal_norm: float = 0 if len(list_etas) > 0: # matrices for temporal correlation list_H = [rpca_utils.toeplitz_matrix(period, D.shape[0]) for period in list_periods] if norm == "L1": for eta, H_matrix in zip(list_etas, list_H): temporal_norm += eta * np.sum(np.abs(H_matrix @ M)) elif norm == "L2": for eta, H_matrix in zip(list_etas, list_H): temporal_norm += eta * float(np.linalg.norm(H_matrix @ M, "fro")) anomalies_norm = np.sum(np.abs(A * Omega)) cost = ( 1 / 2 * ((Omega * (D - M - A)) ** 2).sum() + tau * np.linalg.norm(M, "nuc") + lam * anomalies_norm + temporal_norm ) return cost