Source code for qolmat.imputations.em_sampler

"""Script for EM imputation."""

import logging
import warnings
from abc import abstractmethod
from typing import Dict, List, Literal, Tuple, Union

import numpy as np
from numpy.typing import NDArray
from scipy import linalg as spl
from scipy import optimize as spo
from sklearn import utils as sku
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm import tqdm

from qolmat.utils import utils
from qolmat.utils.utils import RandomSetting

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
)


def _conjugate_gradient(A: NDArray, X: NDArray, mask: NDArray) -> NDArray:
    """Compute conjugate gradient.

    Minimize Tr(X.T AX) wrt X where X is constrained to the initial value
    outside the given mask To this aim, we compute in parallel a gradient
    algorithm for each row.

    Parameters
    ----------
    A : NDArray
        Symmetrical matrix defining the quadratic minimization problem
    X : NDArray
        Array containing the values to optimize
    mask : NDArray
        Boolean array indicating if a value of X is a variable of
        the optimization

    Returns
    -------
    NDArray
        Minimized array.

    """
    rows_imputed = mask.any(axis=1)
    X_temp = X[rows_imputed, :].copy()
    mask = mask[rows_imputed, :].copy()
    n_iter = mask.sum(axis=1).max()
    n_rows, n_cols = X_temp.shape
    X_temp[mask] = 0
    b = -X_temp @ A
    b[~mask] = 0
    xn, pn, rn = np.zeros((n_rows, n_cols)), b, b  # Initialisation
    alphan = np.zeros(n_rows)
    betan = np.zeros(n_rows)
    for n in range(n_iter + 2):
        # if np.max(np.sum(rn**2)) < tolerance :
        #     X_temp[mask_isna] = xn[mask_isna]
        #     return X_temp.transpose()
        Apn = pn @ A
        Apn[~mask] = 0
        numerator = np.sum(rn**2, axis=1)
        denominator = np.sum(pn * Apn, axis=1)
        not_converged = denominator != 0
        # we stop updating if convergence is reached for this row
        alphan[not_converged] = numerator[not_converged] / denominator[not_converged]

        xn, rnp1 = xn + pn * alphan[:, None], rn - Apn * alphan[:, None]
        numerator = np.sum(rnp1**2, axis=1)
        denominator = np.sum(rn**2, axis=1)
        not_converged = denominator != 0
        # we stop updating if convergence is reached for this row
        betan[not_converged] = numerator[not_converged] / denominator[not_converged]

        pn, rn = rnp1 + pn * betan[:, None], rnp1

    X_temp[mask] = xn[mask]
    X_final = X.copy()
    X_final[rows_imputed, :] = X_temp

    return X_final


def max_diff_Linf(list_params: List[NDArray], n_steps: int, order: int = 1) -> float:
    """Compute the maximal L infinity norm.

    Computed between the `n_steps` last elements spaced by order.
    Used to compute the stop criterion.

    Parameters
    ----------
    list_params : List[NDArray]
        List of statistics from the samples
    n_steps : int
        Number of steps to take into account
    order : int, optional
        Space between compared statistics, by default 1

    Returns
    -------
    float
        Minimal norm of differences

    """
    params = np.stack(list_params[-n_steps - order : -order])
    params_shift = np.stack(list_params[-n_steps:])
    min_diff = np.max(np.abs(params - params_shift))
    return min_diff


class EM(BaseEstimator, TransformerMixin):
    """Abstract class for EM imputation.

    It uses imputation through EM optimization and
    a projected MCMC sampling process.

    Parameters
    ----------
    method : Literal["mle", "sample"]
        Method for imputation, choose among "mle" or "sample".
    max_iter_em : int, optional
        Maximum number of steps in the EM algorithm
    n_iter_ou : int, optional
        Number of iterations for the Gibbs sampling method (+ noise addition),
        necessary for convergence, by default 50.
    n_samples : int, optional
        Number of data samples used to estimate the parameters of the
        distribution. Default, 10
    ampli : float, optional
        Whether to sample the posterior (1)
        or to maximise likelihood (0), by default 1.
    random_state : int, optional
        The seed of the pseudo random number generator to use,
        for reproducibility.
    dt : float, optional
        Process integration time step, a large value increases the sample bias
        and can make the algorithm unstable, but compensates for a
        smaller n_iter_ou. By default, 2e-2.
    tolerance : float, optional
        Threshold below which a L infinity norm difference indicates the
        convergence of the parameters
    stagnation_threshold : float, optional
        Threshold below which a stagnation of the L infinity norm difference
        indicates the convergence of the parameters
    stagnation_loglik : float, optional
        Threshold below which an absolute difference of the log likelihood
        indicates the convergence of the parameters
    min_std: float, optional
        Threshold below which the initial data matrix is considered
        ill-conditioned
    period : int, optional
        Integer used to fold the temporal data periodically
    verbose : bool, optional
        Verbosity level, if False the warnings are silenced

    """

    def __init__(
        self,
        method: Literal["mle", "sample"] = "sample",
        max_iter_em: int = 500,
        n_iter_ou: int = 50,
        n_samples: int = 10,
        ampli: float = 1,
        random_state: RandomSetting = None,
        dt: float = 2e-2,
        tolerance: float = 1e-4,
        stagnation_threshold: float = 5e-3,
        stagnation_loglik: float = 2,
        min_std: float = 1e-6,
        period: int = 1,
        verbose: bool = False,
    ):
        if method not in ["mle", "sample"]:
            raise ValueError(f"`method` must be 'mle' or 'sample', provided value is '{method}'.")

        self.method = method
        self.max_iter_em = max_iter_em
        self.n_iter_ou = n_iter_ou
        self.ampli = ampli
        self.rng = sku.check_random_state(random_state)
        self.dt = dt
        self.tolerance = tolerance
        self.stagnation_threshold = stagnation_threshold
        self.stagnation_loglik = stagnation_loglik

        self.min_std = min_std

        self.dict_criteria_stop: Dict[str, List] = {}
        self.period = period
        self.verbose = verbose
        self.n_samples = n_samples
        self.hash_fit = 0
        self.shape = (0, 0)

    def _check_convergence(self) -> bool:
        return False

    @abstractmethod
    def reset_learned_parameters(self):
        """Reset learned parameters."""
        raise NotImplementedError("Method reset_learned_parameters not implemented.")

    @abstractmethod
    def update_parameters(self, X: NDArray):
        """Update parameters."""
        raise NotImplementedError("Method update_parameters not implemented.")

    @abstractmethod
    def combine_parameters(self):
        """Combine parameters."""
        raise NotImplementedError("Method combine_parameters not implemented.")

    def fit_parameters(self, X: NDArray):
        """Fir parameters.

        Parameters
        ----------
        __________
        X: NDArray
            Array to compute the parameters.

        """
        self.reset_learned_parameters()
        self.update_parameters(X)
        self.combine_parameters()

    def fit_parameters_with_missingness(self, X: NDArray):
        """Fit the first estimation of the model parameters.

        It is based on data with missing values.

        Parameters
        ----------
        X : NDArray
            Data matrix with missingness

        """
        X_imp = self.init_imputation(X)
        self.fit_parameters(X_imp)

    def update_criteria_stop(self, X: NDArray):
        """Update the stopping criteria based on X.

        Parameters
        ----------
        X : NDArray
            array used to compute log likelihood.

        """
        self.loglik = self.get_loglikelihood(X)

    @abstractmethod
    def get_loglikelihood(self, X: NDArray) -> float:
        """Compute the loglikelihood of an array.

        Parameters
        ----------
        X : NDArray
            Input array.

        Returns
        -------
        float
            log-likelihood.

        """
        return 0

    @abstractmethod
    def gradient_X_loglik(
        self,
        X: NDArray,
    ) -> NDArray:
        """Compute the gradient X loglik.

        Parameters
        ----------
        X : NDArray
            input array

        Returns
        -------
        NDArray
            gradient

        """
        return np.empty  # type: ignore #noqa

    def get_gamma(self, n_cols: int) -> NDArray:
        """Get gamma.

        Normalization matrix in the sampling process.

        Parameters
        ----------
        n_cols : int
            Number of variables in the data matrix

        Returns
        -------
        NDArray
            Gamma matrix

        """
        # return np.ones((1, n_cols))
        return np.eye(n_cols)

    def _maximize_likelihood(self, X: NDArray, mask_na: NDArray) -> NDArray:
        """Get the argmax of a posterior distribution using the BFGS algorithm.

        Parameters
        ----------
        X : NDArray
            Input numpy array without missingness
        mask_na : NDArray
            Boolean dataframe indicating which coefficients should be
            resampled, and are therefore the variables of the optimization

        Returns
        -------
        NDArray
            DataFrame with imputed values.

        """

        def fun_obj(x):
            x_mat = X.copy()
            x_mat[mask_na] = x
            return -self.get_loglikelihood(x_mat)

        def fun_jac(x):
            x_mat = X.copy()
            x_mat[mask_na] = x
            grad_x = -self.gradient_X_loglik(x_mat)
            grad_x = grad_x[mask_na]
            return grad_x

        # the method BFGS is much slower, probability not adapted
        # to the high-dimension setting
        res = spo.minimize(fun_obj, X[mask_na], jac=fun_jac, method="CG")
        x = res.x

        X_sol = X.copy()
        X_sol[mask_na] = x
        return X_sol

    def _sample_ou(
        self,
        X: NDArray,
        mask_na: NDArray,
        estimate_params: bool = True,
    ) -> NDArray:
        """Sample the Gaussian distribution.

        Under the constraint that not na values must remain
        unchanged, using a projected Ornstein-Uhlenbeck process.
        The sampled distribution tends to the target distribution
        in the limit dt -> 0 and n_iter_ou x dt -> infty.

        Parameters
        ----------
        X : NDArray
            Inital dataframe to be imputed, which should have been already
            imputed using a simple method. This first imputation will be used
            as an initial guess.
        mask_na : NDArray
            Boolean dataframe indicating which coefficients should be
            resampled.
        estimate_params : bool
            Indicates if the parameters of the distribution should be estimated
            while the data are sampled.

        Returns
        -------
        NDArray
            Sampled data matrix

        """
        X_copy = X.copy()
        n_rows, n_cols = X_copy.shape
        if estimate_params:
            self.reset_learned_parameters()
        X_init = X.copy()
        gamma = self.get_gamma(n_cols)
        sqrt_gamma = np.real(spl.sqrtm(gamma))

        for i in range(self.n_iter_ou):
            noise = self.ampli * self.rng.normal(0, 1, size=(n_rows, n_cols))
            grad_X = -self.gradient_X_loglik(X_copy)
            X_copy += -self.dt * grad_X @ gamma + np.sqrt(2 * self.dt) * noise @ sqrt_gamma
            X_copy[~mask_na] = X_init[~mask_na]

            if estimate_params:
                self.update_parameters(X_copy)

        return X_copy

    def fit_X(self, X: NDArray) -> None:
        """Ft X array.

        Parameters
        ----------
        X : NDArray
            Input array.

        """
        mask_na = np.isnan(X)

        # first imputation
        X_imp = self.init_imputation(X)
        self._check_conditioning(X_imp)

        self.fit_parameters_with_missingness(X)

        if not np.any(mask_na):
            self.X = X
            return

        X = self._maximize_likelihood(X_imp, mask_na)

        for iter_em in tqdm(
            range(self.max_iter_em),
            desc="EM parameters estimation",
            disable=not self.verbose,
        ):
            X = self._sample_ou(X, mask_na)

            self.combine_parameters()

            # Stop criteria
            self.update_criteria_stop(X)
            if self._check_convergence():
                if self.verbose:
                    logging.info(f"EM converged after {iter_em} iterations.")
                break

        self.dict_criteria_stop = {key: [] for key in self.dict_criteria_stop}
        self.X = X

    def fit(self, X: NDArray) -> "EM":
        """Fit the statistical distribution with the input X array.

        Parameters
        ----------
        X : NDArray
            Numpy array to be imputed

        """
        X = X.copy()
        sku.validation.validate_data(self, X, ensure_all_finite="allow-nan", dtype="float")
        self.shape_original = X.shape

        if not isinstance(X, np.ndarray):
            raise AssertionError("Invalid type. X must be a NDArray.")
        self.hash_fit = hash(X.tobytes())

        X = utils.prepare_data(X, self.period)

        if hasattr(self, "p_to_fit") and self.p_to_fit:
            aics: List[float] = []
            for p in range(self.max_lagp + 1):
                self.p = p
                self.fit_X(X)
                n1, n2 = self.X.shape
                det = np.linalg.det(self.S)
                if abs(det) < 1e-12:
                    aic = -np.inf
                else:
                    aic = np.log(det) + 2 * p * (n2**2) / n1
                if len(aics) > 0 and aic > aics[-1]:
                    break
                aics.append(aic)
                if aic == -np.inf:
                    break
            self.p = int(np.argmin(aics))
            self.fit_X(X)

        else:
            self.fit_X(X)

        return self

    def transform(self, X: NDArray) -> NDArray:
        """Transform the input X array by imputing the missing values.

        Parameters
        ----------
        X : NDArray
            Numpy array to be imputed

        Returns
        -------
        NDArray
            Final array after EM sampling.

        """
        mask_na = np.isnan(X)
        X = X.copy()
        # sku.check_array(X, ensure_all_finite="allow-nan", dtype="float")
        sku.validation.validate_data(
            self, X, ensure_all_finite="allow-nan", dtype="float", reset=False
        )

        # shape_original = X.shape
        if hash(X.tobytes()) == self.hash_fit:
            X = self.X
            warm_start = True
        else:
            X = utils.prepare_data(X, self.period)
            X = self.init_imputation(X)
            warm_start = False

        X, mask_na = self.pretreatment(X, mask_na)

        if (self.method == "mle") or not warm_start:
            X = self._maximize_likelihood(X, mask_na)
        if self.method == "sample":
            X = self._sample_ou(X, mask_na, estimate_params=False)

        if np.all(np.isnan(X)):
            raise AssertionError("Result contains NaN. This is a bug.")

        return X

    def pretreatment(self, X, mask_na) -> Tuple[NDArray, NDArray]:
        """Pretreat the data before imputation by EM, making it more robust.

        Parameters
        ----------
        X : NDArray
            Data matrix without nans
        mask_na : NDArray
            Boolean matrix indicating which entries are to be imputed

        Returns
        -------
        Tuple[NDArray, NDArray]
            A tuple containing:
            - X the pretreated data matrix
            - mask_na the updated mask

        """
        return X, mask_na

    def _check_conditioning(self, X: NDArray):
        """Check that the data matrix X is not ill-conditioned.

        Running the EM algorithm on data with colinear columns leads to
        numerical instability and unconsistent results.

        Parameters
        ----------
        X : NDArray
            Data matrix

        Raises
        ------
        IllConditioned
            Data matrix is ill-conditioned due to colinear columns.

        """
        n_samples, n_cols = X.shape
        # if n_rows == 1 the function np.cov returns a float
        if n_samples == 1:
            raise ValueError("EM cannot be fitted when n_samples = 1!")

        cov = np.cov(X, bias=True, rowvar=False).reshape(n_cols, -1)
        _, sv, _ = spl.svd(cov)
        min_sv = min(np.sqrt(sv))
        if min_sv < self.min_std:
            warnings.warn(
                "The covariance matrix is ill-conditioned, "
                "indicating high-colinearity: the "
                "smallest singular value of the data matrix is smaller "
                "than the threshold "
                f"min_std ({min_sv} < {self.min_std}). "
                "Consider removing columns of decreasing the threshold."
            )


[docs]class MultiNormalEM(EM): """Multinormal EM imputer. Imputation of missing values using a multivariate Gaussian model through EM optimization and using a projected Ornstein-Uhlenbeck process. Parameters ---------- method : Literal["mle", "sample"] Method for imputation, choose among "sample" or "mle". max_iter_em : int, optional Maximum number of steps in the EM algorithm n_iter_ou : int, optional Number of iterations for the Gibbs sampling method (+ noise addition), necessary for convergence, by default 50. n_samples : int, optional Number of data samples used to estimate the parameters of the distribution. Default, 10 ampli : float, optional Whether to sample the posterior (1) or to maximise likelihood (0), by default 1. random_state : int, optional The seed of the pseudo random number generator to use, for reproducibility. dt : float Process integration time step, a large value increases the sample bias and can make the algorithm unstable, but compensates for a smaller n_iter_ou. By default, 2e-2. tolerance : float, optional Threshold below which a L infinity norm difference indicates the convergence of the parameters stagnation_threshold : float, optional Threshold below which a L infinity norm difference indicates the convergence of the parameters stagnation_loglik : float, optional Threshold below which an absolute difference of the log likelihood indicates the convergence of the parameters period : int, optional Integer used to fold the temporal data periodically verbose : bool, optional Verbosity level, if False the warnings are silenced """
[docs] def __init__( self, method: Literal["mle", "sample"] = "sample", max_iter_em: int = 200, n_iter_ou: int = 50, n_samples: int = 10, ampli: float = 1, random_state: RandomSetting = None, dt: float = 2e-2, tolerance: float = 1e-4, stagnation_threshold: float = 5e-3, stagnation_loglik: float = 2, period: int = 1, verbose: bool = False, ) -> None: super().__init__( method=method, max_iter_em=max_iter_em, n_iter_ou=n_iter_ou, n_samples=n_samples, ampli=ampli, random_state=random_state, dt=dt, tolerance=tolerance, stagnation_threshold=stagnation_threshold, stagnation_loglik=stagnation_loglik, period=period, verbose=verbose, ) self.cov = np.array([[]]) self.dict_criteria_stop = {"logliks": [], "means": [], "covs": []}
[docs] def get_loglikelihood(self, X: NDArray) -> float: """Get the log-likelihood. Value of the log-likelihood up to a constant for the provided X, using the attributes `means` and `cov_inv` for the multivariate normal distribution. Parameters ---------- X : NDArray Input matrix with variables in column Returns ------- float Computed value """ Xc = X - self.means return -((Xc @ self.cov_inv) * Xc).sum().sum() / 2
[docs] def gradient_X_loglik(self, X: NDArray) -> NDArray: """Compute the gradient of the log-likelihood for the provided X. It uses the attributes `means` and `cov_inv` for the multivariate normal distribution. Parameters ---------- X : NDArray Input matrix with variables in column Returns ------- NDArray The gradient of the log-likelihood with respect to the input variable `X`. """ grad_X = -(X - self.means) @ self.cov_inv return grad_X
[docs] def get_gamma(self, n_cols: int) -> NDArray: """Get gamma. If the covariance matrix is not full-rank, defines the projection matrix keeping the sampling process in the relevant subspace. Parameters ---------- n_cols : int Number of variables in the data matrix Returns ------- NDArray Gamma matrix """ U, diag, Vt = spl.svd(self.cov) diag_trunc = np.where(diag < self.min_std**2, 0, diag) diag_trunc = np.where(diag_trunc == 0, 0, np.min(diag_trunc)) gamma = (U * diag_trunc) @ Vt return gamma
[docs] def update_criteria_stop(self, X: NDArray): """Update the variables to compute the stopping criteria. Parameters ---------- X : NDArray Input matrix with variables in column """ self.loglik = self.get_loglikelihood(X) self.dict_criteria_stop["means"].append(self.means) self.dict_criteria_stop["covs"].append(self.cov) self.dict_criteria_stop["logliks"].append(self.loglik)
[docs] def reset_learned_parameters(self): """Reset lists of parameters before starting a new estimation.""" self.list_means = [] self.list_cov = []
[docs] def update_parameters(self, X): """Retain statistics relative to the current sample. Parameters ---------- X : NDArray Input matrix with variables in column """ n_rows, n_cols = X.shape means = np.mean(X, axis=0) self.list_means.append(means) # reshaping for 1D input if n_rows == 1: cov = np.zeros((n_cols, n_cols)) else: cov = np.cov(X, bias=True, rowvar=False).reshape(n_cols, -1) self.list_cov.append(cov)
[docs] def combine_parameters(self): """Combine all statistics computed for each sample in the update step. If uses the MANOVA formula. """ list_means = self.list_means[-self.n_samples :] list_cov = self.list_cov[-self.n_samples :] # MANOVA formula means_stack = np.stack(list_means) self.means = np.mean(means_stack, axis=0) cov_stack = np.stack(list_cov) cov_intragroup = np.mean(cov_stack, axis=0) if len(list_means) == 1: cov_intergroup = np.zeros(cov_intragroup.shape) else: cov_intergroup = np.cov(means_stack, bias=True, rowvar=False) self.cov = cov_intragroup + cov_intergroup self.cov_inv = np.linalg.pinv(self.cov)
[docs] def fit_parameters_with_missingness(self, X: NDArray): """Fit the first estimation of the model parameters. It is based on data with missing values. Parameters ---------- X : NDArray Data matrix with missingness """ self.means, self.cov = utils.nan_mean_cov(X) self.cov_inv = np.linalg.pinv(self.cov)
[docs] def set_parameters(self, means: NDArray, cov: NDArray): """Set the model parameters from a user value. Parameters ---------- means : NDArray Specified value for the mean vector cov : NDArray Specified value for the covariance matrix """ self.means = means self.cov = cov self.cov_inv = np.linalg.pinv(self.cov)
def _maximize_likelihood(self, X: NDArray, mask_na: NDArray) -> NDArray: """Get the argmax of a posterior distribution. Parameters ---------- X : NDArray Input DataFrame without missingness mask_na : NDArray Boolean dataframe indicating which coefficients should be resampled, and are therefore the variables of the optimization Returns ------- NDArray DataFrame with imputed values. """ X_center = X - self.means X_imputed = _conjugate_gradient(self.cov_inv, X_center, mask_na) X_imputed = self.means + X_imputed return X_imputed
[docs] def init_imputation(self, X: NDArray) -> NDArray: """First simple imputation before iterating. Parameters ---------- X : NDArray Data matrix, with missing values Returns ------- NDArray Imputed matrix """ return utils.impute_nans(X, method="median")
def _check_convergence(self) -> bool: """Check if the EM algorithm has converged. Three criteria: 1) if the differences between the estimates of the parameters (mean and covariance) is less than a threshold (min_diff_reached - tolerance). 2) if the difference of the consecutive differences of the estimates is less than a threshold, i.e. stagnates over the last 5 interactions (min_diff_stable - stagnation_threshold). 3) if the likelihood of the data no longer increases, i.e. stagnates over the last 5 iterations (max_loglik - stagnation_loglik). Returns ------- bool True/False if the algorithm has converged """ list_means = self.dict_criteria_stop["means"] list_covs = self.dict_criteria_stop["covs"] list_logliks = self.dict_criteria_stop["logliks"] n_iter = len(list_means) if n_iter < 3: return False min_diff_means1 = max_diff_Linf(list_means, n_steps=1) min_diff_covs1 = max_diff_Linf(list_covs, n_steps=1) min_diff_reached = min_diff_means1 < self.tolerance and min_diff_covs1 < self.tolerance if min_diff_reached: return True if n_iter < 7: return False min_diff_means5 = max_diff_Linf(list_means, n_steps=5) min_diff_covs5 = max_diff_Linf(list_covs, n_steps=5) min_diff_stable = ( min_diff_means5 < self.stagnation_threshold and min_diff_covs5 < self.stagnation_threshold ) min_diff_loglik5_ord1 = max_diff_Linf(list_logliks, n_steps=5) min_diff_loglik5_ord2 = max_diff_Linf(list_logliks, n_steps=5, order=2) max_loglik = (min_diff_loglik5_ord1 < self.stagnation_loglik) or ( min_diff_loglik5_ord2 < self.stagnation_loglik ) return min_diff_stable or max_loglik
[docs]class VARpEM(EM): """VAR(p) EM imputer. Imputation of missing values using a vector autoregressive model through EM optimization and using a projected Ornstein-Uhlenbeck process. Equations and notations and from the following reference, matrices are transposed for consistency: Lütkepohl (2005) New Introduction to Multiple Time Series Analysis X^n+1 = nu + sum_k A_k^T @ X_k^n + G_n @ S Parameters ---------- method : Literal["mle", "sample"] Method for imputation, choose among "sample" or "mle". max_iter_em : int, optional Maximum number of steps in the EM algorithm n_iter_ou : int, optional Number of iterations for the Gibbs sampling method (+ noise addition), necessary for convergence, by default 50. ampli : float, optional Whether to sample the posterior (1) or to maximise likelihood (0), by default 1. random_state : int, optional The seed of the pseudo random number generator to use, for reproducibility. dt : float Process integration time step, a large value increases the sample bias and can make the algorithm unstable, but compensates for a smaller n_iter_ou. By default, 2e-2. tolerance : float, optional Threshold below which a L infinity norm difference indicates the convergence of the parameters stagnation_threshold : float, optional Threshold below which a L infinity norm difference indicates the convergence of the parameters stagnation_loglik : float, optional Threshold below which an absolute difference of the log likelihood indicates the convergence of the parameters period : int, optional Integer used to fold the temporal data periodically verbose: bool default `False` Attributes ---------- X_intermediate : list List of pd.DataFrame giving the results of the EM process as function of the iteration number. Examples -------- >>> import numpy as np >>> from qolmat.imputations.em_sampler import VARpEM >>> imputer = VARpEM(method="sample", random_state=11) >>> X = np.array([[1, 1, 1, 1], [np.nan, np.nan, 3, 2], [1, 2, 2, 1], [2, 2, 2, 2]]) >>> imputer.fit_transform(X) # doctest: +SKIP """
[docs] def __init__( self, method: Literal["mle", "sample"] = "sample", max_iter_em: int = 200, n_iter_ou: int = 50, ampli: float = 1, random_state: RandomSetting = None, dt: float = 2e-2, tolerance: float = 1e-4, stagnation_threshold: float = 5e-3, stagnation_loglik: float = 2, period: int = 1, verbose: bool = False, p: Union[None, int] = None, max_lagp: int = 2, ) -> None: super().__init__( method=method, max_iter_em=max_iter_em, n_iter_ou=n_iter_ou, ampli=ampli, random_state=random_state, dt=dt, tolerance=tolerance, stagnation_threshold=stagnation_threshold, stagnation_loglik=stagnation_loglik, period=period, verbose=verbose, ) self.dict_criteria_stop = {"logliks": [], "S": [], "B": []} self.p_to_fit = False self.p = p # type: ignore #noqa self.max_lagp = max_lagp if self.p is None: self.p_to_fit = True
[docs] def get_loglikelihood(self, X: NDArray) -> float: """Get the log-likelihood. Value of the log-likelihood up to a constant for the provided X, using the attributes `nu`, `B` and `S` for the VAR(p) distribution. Parameters ---------- X : NDArray Input matrix with variables in column Returns ------- float Computed value """ Z, Y = utils.create_lag_matrices(X, self.p) U = Y - Z @ self.B return -(U @ self.S_inv * U).sum().sum() / 2
[docs] def gradient_X_loglik(self, X: NDArray) -> NDArray: """Compute the gradient of the log-likelihood for the provided X. It uses the attributes `means` and `cov_inv` for the VAR(p) distribution. Parameters ---------- X : NDArray Input matrix with variables in column Returns ------- NDArray The gradient of the log-likelihood with respect to the input variable `X`. """ n_rows, n_cols = X.shape Z, Y = utils.create_lag_matrices(X, p=self.p) U = Y - Z @ self.B grad_1 = np.zeros(X.shape) grad_1[self.p :, :] = -U @ self.S_inv grad_2 = np.zeros(X.shape) for lag in range(self.p): A = self.B[1 + lag * n_cols : 1 + (lag + 1) * n_cols, :] grad_2[self.p - lag - 1 : -lag - 1, :] += U @ self.S_inv @ A.T return grad_1 + grad_2
[docs] def get_gamma(self, n_cols: int) -> NDArray: """Compute gamma. If the noise matrix is not full-rank, defines the projection matrix keeping the sampling process in the relevant subspace. Rescales the process to avoid instabilities. Parameters ---------- n_cols : int Number of variables in the data matrix Returns ------- NDArray Gamma matrix """ U, diag, Vt = spl.svd(self.S) diag_trunc = np.where(diag < self.min_std**2, 0, diag) diag_trunc = np.where(diag_trunc == 0, 0, np.min(diag_trunc)) gamma = (U * diag_trunc) @ Vt # gamma = np.eye(len(self.cov)) return gamma
[docs] def update_criteria_stop(self, X: NDArray): """Update the variable to compute the stopping criteria. Parameters ---------- X : NDArray Input matrix with variables in column """ self.loglik = self.get_loglikelihood(X) self.dict_criteria_stop["S"].append(self.list_S[-1]) self.dict_criteria_stop["B"].append(self.list_B[-1]) self.dict_criteria_stop["logliks"].append(self.loglik)
[docs] def reset_learned_parameters(self): """Reset lists of parameters before starting a new estimation.""" self.list_ZZ = [] self.list_ZY = [] self.list_B = [] self.list_S = [] self.list_YY = []
[docs] def update_parameters(self, X: NDArray) -> None: """Retain statistics relative to the current sample. Parameters ---------- X : NDArray Input matrix with variables in column """ Z, Y = utils.create_lag_matrices(X, self.p) n_obs = len(Z) ZZ = Z.T @ Z / n_obs ZZ_inv = np.linalg.pinv(ZZ) ZY = Z.T @ Y / n_obs B = ZZ_inv @ ZY U = Y - Z @ B S = U.T @ U / n_obs YY = Y.T @ Y / n_obs self.list_ZZ.append(ZZ) self.list_ZY.append(ZY) self.list_B.append(B) self.list_S.append(S) self.list_YY.append(YY)
[docs] def combine_parameters(self) -> None: """Combine statistics computed for each sample in the update step. The estimation of `nu` and `B` corresponds to the MLE, whereas `S` is approximated. """ list_ZZ = self.list_ZZ[-self.n_samples :] list_ZY = self.list_ZY[-self.n_samples :] list_YY = self.list_YY[-self.n_samples :] stack_ZZ = np.stack(list_ZZ) self.ZZ = np.mean(stack_ZZ, axis=0) stack_ZY = np.stack(list_ZY) self.ZY = np.mean(stack_ZY, axis=0) self.ZZ_inv = np.linalg.pinv(self.ZZ) self.B = self.ZZ_inv @ self.ZY stack_YY = np.stack(list_YY) self.YY = np.mean(stack_YY, axis=0) self.S = self.YY - self.ZY.T @ self.B - self.B.T @ self.ZY + self.B.T @ self.ZZ @ self.B self.S[np.abs(self.S) < 1e-12] = 0 self.S_inv = np.linalg.pinv(self.S, rcond=1e-10)
[docs] def set_parameters(self, B: NDArray, S: NDArray): """Set the model parameters from a user value. Parameters ---------- B : NDArray Specified value for the autoregression matrix S : NDArray Specified value for the noise covariance matrix """ self.B = B self.S = S self.S_inv = np.linalg.pinv(self.S)
[docs] def init_imputation(self, X: NDArray) -> NDArray: """First simple imputation before iterating. Parameters ---------- X : NDArray Data matrix, with missing values Returns ------- NDArray Imputed matrix """ return utils.linear_interpolation(X)
[docs] def pretreatment(self, X, mask_na) -> Tuple[NDArray, NDArray]: """Pretreat the data before imputation by EM, making it more robust. In the case of the VAR(p) model we freeze the naive imputation on the first observations if all variables are missing to avoid explosive imputations. Parameters ---------- X : NDArray Data matrix without nans mask_na : NDArray Boolean matrix indicating which entries are to be imputed Returns ------- Tuple[NDArray, NDArray] A tuple containing: - X the pretreated data matrix - mask_na the updated mask """ if self.p == 0: return X, mask_na mask_na = mask_na.copy() n_holes_left = int(np.sum(~np.cumsum(~mask_na, axis=0).any(axis=1))) mask_na[:n_holes_left] = False return X, mask_na
def _check_convergence(self) -> bool: """Check if the EM algorithm has converged. Three criteria: 1) if the differences between the estimates of the parameters (mean and covariance) is less than a threshold (min_diff_reached - tolerance). OR 2) if the difference of the consecutive differences of the estimates is less than a threshold, i.e. stagnates over the last 5 interactions (min_diff_stable - stagnation_threshold). OR 3) if the likelihood of the data no longer increases, i.e. stagnates over the last 5 iterations (max_loglik - stagnation_loglik). Returns ------- bool True/False if the algorithm has converged """ list_B = self.dict_criteria_stop["B"] list_S = self.dict_criteria_stop["S"] list_logliks = self.dict_criteria_stop["logliks"] n_iter = len(list_B) if n_iter < 3: return False min_diff_B1 = max_diff_Linf(list_B, n_steps=1) min_diff_S1 = max_diff_Linf(list_S, n_steps=1) min_diff_reached = min_diff_B1 < self.tolerance and min_diff_S1 < self.tolerance if min_diff_reached: return True if n_iter < 7: return False min_diff_B5 = max_diff_Linf(list_B, n_steps=5) min_diff_S5 = max_diff_Linf(list_S, n_steps=5) min_diff_stable = ( min_diff_B5 < self.stagnation_threshold and min_diff_S5 < self.stagnation_threshold ) max_loglik5_ord1 = max_diff_Linf(list_logliks, n_steps=5, order=1) max_loglik5_ord2 = max_diff_Linf(list_logliks, n_steps=5, order=2) max_loglik = (max_loglik5_ord1 < self.stagnation_loglik) or ( max_loglik5_ord2 < self.stagnation_loglik ) return min_diff_stable or max_loglik