Source code for qolmat.imputations.imputers

"""Script for the imputers."""

import copy
import warnings
from abc import abstractmethod
from functools import partial
from typing import Any, Callable, Dict, Literal, Optional, Tuple, Union

import numpy as np
import pandas as pd
import sklearn as skl
from numpy.typing import NDArray
from sklearn import utils as sku
from sklearn.base import BaseEstimator
from sklearn.experimental import enable_iterative_imputer  # noqa
from sklearn.impute import IterativeImputer, KNNImputer
from sklearn.impute._base import _BaseImputer
from statsmodels.tsa import seasonal as tsa_seasonal

from qolmat.imputations import em_sampler, softimpute
from qolmat.imputations.rpca import rpca_noisy, rpca_pcp
from qolmat.utils import utils
from qolmat.utils.exceptions import NotDataFrame
from qolmat.utils.utils import RandomSetting


class _Imputer(_BaseImputer):
    """Base class for all imputers.

    Parameters
    ----------
    columnwise : bool, optional
        If True, the imputer will be computed for each column, else it will be
        computed on the whole dataframe, by default False
    shrink : bool, optional
        Indicates if the element-wise imputation method returns a single value,
        by default False
    random_state : RandomSetting, optional
        Controls the randomness of the fit_transform, by default None
    imputer_params: Tuple[str, ...]
        List of parameters of the imputer, which can be specified globally or
        columnwise
    groups: Tuple[str, ...]
        List of column names to group by, by default []

    """

    def __init__(
        self,
        columnwise: bool = False,
        shrink: bool = False,
        random_state: RandomSetting = None,
        imputer_params: Tuple[str, ...] = (),
        groups: Tuple[str, ...] = (),
    ):
        self.columnwise = columnwise
        self.shrink = shrink
        self.random_state = random_state
        self.imputer_params = imputer_params
        self.groups = groups
        self.missing_values = np.nan

    def get_hyperparams(self, col: Optional[str] = None):
        """Filter hyperparameters based on the specified column.

        The dictionary keys in the form
        name_params/column are only relevent for the specified column and
        are filtered accordingly.

        Parameters
        ----------
        col : str
            The column name to filter hyperparameters.

        Returns
        -------
        dict
            A dictionary containing filtered hyperparameters.

        """
        hyperparams = {}
        for key in self.imputer_params:
            value = getattr(self, key)
            if "/" not in key:
                name_param = key
                if name_param not in hyperparams:
                    hyperparams[name_param] = value
            elif col is not None:
                name_param, col2 = key.split("/")
                if col2 == col:
                    hyperparams[name_param] = value
        return hyperparams

    def _check_dataframe(self, X: NDArray):
        """Check that the input X is a dataframe; otherwise, raises an error.

        Parameters
        ----------
        X : NDArray
            Array-like to process

        Raises
        ------
        ValueError
            Input has to be a pandas.DataFrame.

        """
        if not isinstance(X, (pd.DataFrame)):
            raise NotDataFrame(type(X))

    def __sklearn_tags__(self):
        tags = super().__sklearn_tags__()
        # tags.input_tags = InputTags(
        #     two_d_array=True, categorical=True, string=True, allow_nan=True
        # )
        tags.input_tags.allow_nan = True
        tags.target_tags.single_output = False
        tags.non_deterministic = True
        return tags

    def fit(self, X: pd.DataFrame, y: pd.DataFrame = None) -> "_Imputer":
        """Fit the imputer on X.

        Parameters
        ----------
        X : pd.DataFrame
            Data matrix on which the Imputer must be fitted.
        y : pd.DataFrame
            None.

        Returns
        -------
        self : Self
            Returns self.

        """
        sku.validation.validate_data(
            self,
            X,
            ensure_all_finite="allow-nan",
            dtype=["float", "int", "string", "categorical", "object"],
        )
        df = utils._validate_input(X)
        self.n_features_in_ = len(df.columns)

        for column in df:
            if df[column].isnull().all():
                raise ValueError("Input contains a column full of NaN")

        self.columns_ = tuple(df.columns)
        self._rng = sku.check_random_state(self.random_state)
        if hasattr(self, "estimator") and hasattr(self.estimator, "random_state"):
            self.estimator.random_state = self._rng

        if self.groups:
            self.ngroups_ = df.groupby(list(self.groups)).ngroup().rename("_ngroup")
        else:
            self.ngroups_ = pd.Series(0, index=df.index).rename("_ngroup")

        self._setup_fit()
        if self.columnwise:
            for col in df.columns:
                self._fit_allgroups(df[[col]], col=col)
        else:
            self._fit_allgroups(df)

        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        """Transform/impute a dataframe.

        It returns a dataframe with same shape as `X`,
        unchanged values, where all nans are replaced by non-nan values.
        Depending on the imputer parameters, the dataframe can be imputed with
        columnwise and/or groupwise methods.
        Also works for numpy arrays, returning numpy arrays, but the use of
        pandas dataframe is advised.

        Parameters
        ----------
        X : pd.DataFrame
            Dataframe to impute.

        Returns
        -------
        pd.DataFrame
            Imputed dataframe.

        """
        sku.validation.validate_data(
            self,
            X,
            ensure_all_finite="allow-nan",
            dtype=["float", "int", "string", "categorical", "object"],
            reset=False,
        )
        df = utils._validate_input(X)
        if tuple(df.columns) != self.columns_:
            raise ValueError(
                """The number of features is different
                from the counterpart in fit.
                Reshape your data"""
            )

        for column in df:
            if df[column].isnull().all():
                raise ValueError("Input contains a column full of NaN")

        cols_with_nans = df.columns[df.isna().any()]

        if cols_with_nans.empty:
            df_imputed = df
        else:
            if self.columnwise:
                df_imputed = df.copy()
                for col in cols_with_nans:
                    df_imputed[col] = self._transform_allgroups(df[[col]], col=col)
            else:
                df_imputed = self._transform_allgroups(df)

        if isinstance(X, (np.ndarray)):
            df_imputed = df_imputed.to_numpy()

        return df_imputed

    def fit_transform(self, X: pd.DataFrame, y: pd.DataFrame = None) -> pd.DataFrame:
        """Return an imputed dataframe.

        The returned df has same shape as `X`, with unchanged values,
        but all nans are replaced by non-nan values.
        Depending on the imputer parameters, the dataframe can be imputed
        with columnwise and/or groupwise methods.

        Parameters
        ----------
        X : pd.DataFrame
            Dataframe to impute.
        y : pd.DataFrame
            None

        Returns
        -------
        pd.DataFrame
            Imputed dataframe.

        """
        self.fit(X)
        return self.transform(X)

    def _fit_transform_fallback(self, df: pd.DataFrame) -> pd.DataFrame:
        """Impute `df` with each column's median if missing values remain.

        This can introduce data leakage for forward imputers if unchecked.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe with missing values.

        Returns
        -------
        pd.DataFrame
            Dataframe df imputed by the median of each column.

        """
        self._check_dataframe(df)
        cols_with_nan = df.columns[df.isna().any()]
        for col in cols_with_nan:
            if pd.api.types.is_numeric_dtype(df[col]):
                df[col] = df[col].fillna(df[col].median())
            df[col] = df[col].fillna(df[col].mode()[0])
        return df

    def _fit_allgroups(self, df: pd.DataFrame, col: str = "__all__") -> "_Imputer":
        """Fit the imputer.

        Either on a column, for a columnwise setting, on or all columns.

        Parameters
        ----------
        df : pd.DataFrame
            Input dataframe
        col : str, optional
            Column on which the imputer is fitted, by default "__all__"

        Returns
        -------
        Self
            Returns self.

        Raises
        ------
        ValueError
            Input has to be a pandas.DataFrame.

        """
        self._check_dataframe(df)
        fun_on_col = partial(self._fit_element, col=col)
        if self.groups:
            groupby = df.groupby(self.ngroups_, group_keys=False)
            self._dict_fitting[col] = groupby.apply(fun_on_col).to_dict()
        else:
            self._dict_fitting[col] = {0: fun_on_col(df)}

        return self

    def _setup_fit(self) -> None:
        """Set up step of the fit function, before looping over the columns."""
        self._dict_fitting: Dict[str, Any] = {}
        return

    def _apply_groupwise(self, fun: Callable, df: pd.DataFrame, **kwargs) -> Any:
        """Apply the function `fun`in a groupwise manner to the dataframe `df`.

        Parameters
        ----------
        fun : Callable
            Function applied groupwise to the dataframe with arguments kwargs
        df : pd.DataFrame
            Dataframe on which the function is applied
        **kwargs: dict
            Additional arguments

        Returns
        -------
        Any
            Depends on the function signature

        """
        self._check_dataframe(df)
        fun_on_col = partial(fun, **kwargs)
        if self.groups:
            groupby = df.groupby(self.ngroups_, group_keys=False)
            if self.shrink:
                return groupby.transform(fun_on_col)
            else:
                return groupby.apply(fun_on_col)
        else:
            return fun_on_col(df)

    def _transform_allgroups(self, df: pd.DataFrame, col: str = "__all__") -> pd.DataFrame:
        """Impute `df`.

        It doe sit by applying the specialized method `transform_element`
        on each group, if groups have been given. If the method leaves nan,
        `fit_transform_fallback` is called in order to return a dataframe
        without nan.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe or column to impute
        col : str, optional
            Column transformed by the imputer, by default "__all__"

        Returns
        -------
        pd.DataFrame
            Imputed dataframe or column

        Raises
        ------
        NotDataFrame
            Input has to be a pandas.DataFrame.

        """
        self._check_dataframe(df)
        df = df.copy()
        imputation_values = self._apply_groupwise(self._transform_element, df, col=col)

        df = df.fillna(imputation_values)
        # fill na by applying imputation method without groups
        if df.isna().any().any():
            imputation_values = self._fit_transform_fallback(df)
            df = df.fillna(imputation_values)

        return df

    @abstractmethod
    def _fit_element(self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0) -> Any:
        """Fit the imputer on `df`.

        It does it at the group and/or column level depending onself.groups
        and self.columnwise.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe on which the imputer is fitted
        col : str, optional
            Column on which the imputer is fitted, by default "__all__"
        ngroup : int, optional
            ID of the group on which the method is applied

        Returns
        -------
        Any
            Return self.

        Raises
        ------
        NotDataFrame
            Input has to be a pandas.DataFrame.

        """
        self._check_dataframe(df)
        return self

    @abstractmethod
    def _transform_element(
        self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0
    ) -> pd.DataFrame:
        """Transform the dataframe `df`.

        It does it at the group and/or column level depending onself.groups
        and self.columnwise.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe or column to impute
        col : str, optional
            Column transformed by the imputer, by default "__all__"
        ngroup : int, optional
            ID of the group on which the method is applied

        Returns
        -------
        pd.DataFrame
            Imputed dataframe.

        Raises
        ------
        NotDataFrame
            Input has to be a pandas.DataFrame.

        """
        self._check_dataframe(df)
        return df


[docs]class ImputerOracle(_Imputer): """Perfect imputer, requires to know real values. Used as a reference to evaluate imputation metrics. Parameters ---------- df : pd.DataFrame Dataframe containing real values. groups: Tuple[str, ...] List of column names to group by, by default [] """
[docs] def __init__( self, ) -> None: super().__init__()
[docs] def set_solution(self, df: pd.DataFrame): """Set the true values to be returned by the oracle. Parameters ---------- df : pd.DataFrame True dataset with mask """ self.df_solution = df
[docs] def transform(self, X: pd.DataFrame) -> pd.DataFrame: """Impute df with corresponding known values. Parameters ---------- X : pd.DataFrame dataframe to impute Returns ------- pd.DataFrame dataframe imputed with premasked values """ sku.validation.validate_data( self, X, ensure_all_finite="allow-nan", dtype=["float", "int", "string", "categorical", "object"], reset=False, ) df = utils._validate_input(X) if tuple(df.columns) != self.columns_: raise ValueError( """The number of features is different from the counterpart in fit. Reshape your data""" ) if hasattr(self, "df_solution"): df_imputed = df.fillna(self.df_solution) else: warnings.warn("OracleImputer not initialized! Returning imputation with zeros") df_imputed = df.fillna(0) if isinstance(X, (np.ndarray)): df_imputed = df_imputed.to_numpy() return df_imputed
[docs]class ImputerSimple(_Imputer): """Simple imputer. Impute each column by its mean, its median or its mode (if its categorical). Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> imputer = imputers.ImputerSimple() >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.0 1.0 1.0 1.0 1 1.0 2.0 2.0 2.0 2 1.0 2.0 2.0 5.0 3 2.0 2.0 2.0 2.0 """
[docs] def __init__(self, groups: Tuple[str, ...] = (), strategy="median") -> None: super().__init__(groups=groups, columnwise=True, shrink=False) self.strategy = strategy
def _fit_element(self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0) -> Any: """Fit the imputer on `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Any Return fitted KNN model Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ if pd.api.types.is_numeric_dtype(df[col]): model = skl.impute.SimpleImputer(strategy=self.strategy) else: model = skl.impute.SimpleImputer(strategy="most_frequent") return model.fit(df[[col]]) def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ model = self._dict_fitting[col][ngroup] X_imputed = model.fit_transform(df) return pd.DataFrame(data=X_imputed, columns=df.columns, index=df.index)
[docs]class ImputerShuffle(_Imputer): """Impute using random samples from the considered column. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] random_state : RandomSetting, optional Determine the randomness of the imputer, by default None Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> imputer = imputers.ImputerShuffle(random_state=42) >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.0 1.0 1.0 1.0 1 2.0 1.0 2.0 2.0 2 1.0 2.0 2.0 5.0 3 2.0 2.0 2.0 2.0 """
[docs] def __init__( self, groups: Tuple[str, ...] = (), random_state: RandomSetting = None, ) -> None: super().__init__(groups=groups, columnwise=True, random_state=random_state)
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) n_missing = df.isna().sum().sum() if df.isna().all().all(): return df name = df.columns[0] values = df[name] values_notna = values.dropna() samples = self._rng.choice(values_notna, n_missing, replace=True) values[values.isna()] = samples df_imputed = values.to_frame() return df_imputed
[docs]class ImputerLOCF(_Imputer): """LOCF imputer. It imputes by the last available value of the column. Relevant for time series. If the first observations are missing, it is imputed by a NOCB Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> imputer = imputers.ImputerLOCF() >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.0 1.0 1.0 1.0 1 1.0 1.0 1.0 1.0 2 1.0 2.0 2.0 5.0 3 2.0 2.0 2.0 2.0 """
[docs] def __init__( self, groups: Tuple[str, ...] = (), ) -> None: super().__init__(groups=groups, columnwise=True)
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) df_out = df.copy() for col in df: df_out[col] = df[col].ffill().bfill() return df_out
[docs]class ImputerNOCB(_Imputer): """NOCB imputer. Impute by the next available value of the column. Relevent for time series. If the last observation is missing, it is imputed by a LOCF. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> imputer = imputers.ImputerNOCB() >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.0 1.0 1.0 1.0 1 1.0 2.0 2.0 5.0 2 1.0 2.0 2.0 5.0 3 2.0 2.0 2.0 2.0 """
[docs] def __init__( self, groups: Tuple[str, ...] = (), ) -> None: super().__init__(groups=groups, columnwise=True)
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) df_out = df.copy() for col in df: df_out[col] = df[col].bfill().ffill() return df_out
[docs]class ImputerInterpolation(_Imputer): """Interpolation imputer. This class implements a way to impute time series using some interpolation strategies supported by pd.Series.interpolate, such as "linear", "slinear", "quadratic", ... By default, linear interpolation. As for pd.Series.interpolate, if "method" is "spline" or "polynomial", an "order" has to be passed. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] method : Optional[str] = "linear" name of the method for interpolation: "linear", "cubic", "spline", "slinear", ... see pd.Series.interpolate for more example. By default, the value is set to "linear". order : Optional[int] order for the spline interpolation col_time : Optional[str] Name of the column representing the time index to use for the interpolation. If None, the index is used assuming it is one-dimensional. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> imputer = imputers.ImputerInterpolation(method="spline", order=2) >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.000000 1.000000 1.000000 1.000000 1 0.666667 1.666667 1.666667 4.666667 2 1.000000 2.000000 2.000000 5.000000 3 2.000000 2.000000 2.000000 2.000000 """
[docs] def __init__( self, groups: Tuple[str, ...] = (), method: str = "linear", order: Optional[int] = None, col_time: Optional[str] = None, ) -> None: super().__init__(imputer_params=("method", "order"), groups=groups, columnwise=True) self.method = method self.order = order self.col_time = col_time
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams(col=col) index = df.index if self.col_time is None: df = df.reset_index(drop=True) else: df.index = df.index.get_level_values(self.col_time) df_imputed = df.interpolate(**hyperparams) df_imputed = df_imputed.ffill().bfill() df_imputed.index = index return df_imputed
[docs]class ImputerResiduals(_Imputer): """Residual imputer. This class implements an imputation method based on a STL decomposition. The series are de-seasonalised, de-trended, residuals are imputed, then residuals are re-seasonalised and re-trended. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] period : int Period of the series. Must be used if x is not a pandas object or if the index of x does not have a frequency. Overrides default periodicity of x if x is a pandas object with a timeseries index. model_tsa : Optional[str] Type of seasonal component "additive" or "multiplicative". Abbreviations are accepted. By default, the value is set to "additive" extrapolate_trend : int or 'freq', optional If set to > 0, the trend resulting from the convolution is linear least-squares extrapolated on both ends (or the single one if two_sided is False) considering this many (+1) closest points. If set to 'freq', use `freq` closest points. Setting this parameter results in no NaN values in trend or resid components. method_interpolation : str method for the residuals interpolation Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations.imputers import ImputerResiduals >>> np.random.seed(100) >>> df = pd.DataFrame(index=pd.date_range("2015-01-01", "2020-01-01")) >>> mean = 5 >>> offset = 10 >>> df["y"] = np.cos(df.index.dayofyear / 365 * 2 * np.pi - np.pi) * mean + offset >>> trend = 5 >>> df["y"] = df["y"] + trend * np.arange(0, df.shape[0]) / df.shape[0] >>> noise_mean = 0 >>> noise_var = 2 >>> df["y"] = df["y"] + np.random.normal(noise_mean, noise_var, df.shape[0]) >>> mask = np.random.choice([True, False], size=df.shape) >>> df = df.mask(mask) >>> imputor = ImputerResiduals(period=365, model_tsa="additive") >>> imputor.fit_transform(df) y 2015-01-01 1.501210 2015-01-02 5.691061 2015-01-03 4.404106 2015-01-04 3.531540 2015-01-05 3.129532 ... ... 2019-12-28 10.288054 2019-12-29 10.632659 2019-12-30 14.900671 2019-12-31 12.957837 2020-01-01 12.780517 <BLANKLINE> [1827 rows x 1 columns] """
[docs] def __init__( self, period: int = 1, groups: Tuple[str, ...] = (), model_tsa: Optional[str] = "additive", extrapolate_trend: Optional[Union[int, str]] = "freq", method_interpolation: Optional[str] = "linear", ): super().__init__( imputer_params=( "model_tsa", "period", "extrapolate_trend", "method_interpolation", ), groups=groups, columnwise=True, ) self.model_tsa = model_tsa self.period = period self.extrapolate_trend = extrapolate_trend self.method_interpolation = method_interpolation
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams(col=col) name = df.columns[0] values = df[df.columns[0]] values_interp = ( values.interpolate(method=hyperparams["method_interpolation"]).ffill().bfill() ) result = tsa_seasonal.seasonal_decompose( values_interp, model=hyperparams["model_tsa"], period=hyperparams["period"], extrapolate_trend=hyperparams["extrapolate_trend"], ) residuals = result.resid residuals[values.isna()] = np.nan residuals = ( residuals.interpolate(method=hyperparams["method_interpolation"]).ffill().bfill() ) df_result = pd.DataFrame({name: result.seasonal + result.trend + residuals}) return df_result
[docs]class ImputerKNN(_Imputer): """K-nnearest neighbors imputer. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] n_neighbors : int, default=5 Number of neighbors to use by default for `kneighbors` queries. weights : {`uniform`, `distance`}, callable or None, default=`uniform` Weight function used in prediction. Possible values: - `uniform` : uniform weights. All points in each neighborhood are weighted equally. - `distance` : weight points by the inverse of their distance. in this case, closer neighbors of a query point will have a greater influence than neighbors which are further away. - [callable] : a user-defined function which accepts an array of distances, and returns an array of the same shape containing the weights. Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> imputer = imputers.ImputerKNN(n_neighbors=2) >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.000000 1.000000 1.000000 1.000000 1 1.333333 1.666667 1.666667 2.666667 2 1.000000 2.000000 2.000000 5.000000 3 2.000000 2.000000 2.000000 2.000000 """
[docs] def __init__( self, groups: Tuple[str, ...] = (), n_neighbors: int = 5, weights: str = "distance", ) -> None: super().__init__( imputer_params=("n_neighbors", "weights"), groups=groups, columnwise=False, ) self.n_neighbors = n_neighbors self.weights = weights
def _fit_element(self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0) -> KNNImputer: """Fit. the imputer on `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Any Return fitted KNN model Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if col != "__all__": raise ValueError(f"col must be '__all__', but '{col}' has been passed.") hyperparameters = self.get_hyperparams() model = KNNImputer(metric="nan_euclidean", **hyperparameters) model = model.fit(df) return model def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if col != "__all__": raise ValueError(f"col must be '__all__', but '{col}' has been passed.") model = self._dict_fitting["__all__"][ngroup] X_imputed = model.fit_transform(df) return pd.DataFrame(data=X_imputed, columns=df.columns, index=df.index)
[docs]class ImputerMICE(_Imputer): """MICE imputer. Wrapper of the class sklearn.impute.IterativeImputer in our framework. This imputer relies on an estimator which is iterative. Parameters ---------- groups : Tuple[str, ...], optional specific groups for groupby, by default () estimator : Optional[BaseEstimator], optional estimator to use, by default None random_state : RandomSetting, optional random state, by default None sample_posterior : bool, optional true if sample, false otherwise, by default False max_iter : int, optional maximum number of iterations, by default 100 """
[docs] def __init__( self, groups: Tuple[str, ...] = (), estimator: Optional[BaseEstimator] = None, random_state: RandomSetting = None, sample_posterior=False, max_iter=100, ) -> None: super().__init__( imputer_params=("sample_posterior", "max_iter"), groups=groups, columnwise=False, random_state=random_state, ) self.estimator = estimator self.sample_posterior = sample_posterior self.max_iter = max_iter
def _fit_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> IterativeImputer: """Fit the imputer on `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Any Return fitted KNN model Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if col != "__all__": raise ValueError(f"col must be '__all__', but '{col}' has been passed.") hyperparameters = self.get_hyperparams() model = IterativeImputer(estimator=self.estimator, **hyperparameters) model = model.fit(df) self.n_iter_ = model.n_iter_ return model def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if col != "__all__": raise ValueError(f"col must be '__all__', but '{col}' has been passed.") model = self._dict_fitting["__all__"][ngroup] X_imputed = model.fit_transform(df) return pd.DataFrame(data=X_imputed, columns=df.columns, index=df.index)
[docs]class ImputerRegressor(_Imputer): """Regressor imputer. This class implements a regression imputer in the multivariate case. It imputes each column using a single fit-predict for a given estimator, based on the columns which have no missing values. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] estimator : BaseEstimator, optional Estimator for imputing a column based on the others handler_nan : str Can be `fit, `row` or `column`: - if `fit`, the estimator is assumed to be robust to missing values - if `row` all non complete rows will be removed from the train dataset, and will not be used for the inference, - if `column` all non complete columns will be ignored. By default, `row` random_state : RandomSetting, optional Controls the randomness of the fit_transform, by default None Examples -------- >>> import numpy as np >>> import pandas as pd >>> from qolmat.imputations import imputers >>> from sklearn.ensemble import ExtraTreesRegressor >>> imputer = imputers.ImputerRegressor(estimator=ExtraTreesRegressor()) >>> df = pd.DataFrame( ... data=[ ... [1, 1, 1, 1], ... [np.nan, np.nan, np.nan, np.nan], ... [1, 2, 2, 5], ... [2, 2, 2, 2], ... ], ... columns=["var1", "var2", "var3", "var4"], ... ) >>> imputer.fit_transform(df) var1 var2 var3 var4 0 1.0 1.0 1.0 1.0 1 1.0 2.0 2.0 2.0 2 1.0 2.0 2.0 5.0 3 2.0 2.0 2.0 2.0 """
[docs] def __init__( self, imputer_params: Tuple[str, ...] = ("handler_nan",), groups: Tuple[str, ...] = (), estimator: Optional[BaseEstimator] = None, handler_nan: str = "row", random_state: RandomSetting = None, ): super().__init__( imputer_params=imputer_params, groups=groups, random_state=random_state, ) self.estimator = estimator self.handler_nan = handler_nan
def _fit_estimator(self, estimator, X, y) -> Any: return estimator.fit(X, y) def _predict_estimator(self, estimator, X) -> pd.Series: pred = estimator.predict(X) return pd.Series(pred, index=X.index)
[docs] def get_Xy_valid(self, df: pd.DataFrame, col: str) -> Tuple[pd.DataFrame, pd.Series]: """Get a valid couple (X,y). Parameters ---------- df : pd.DataFrame Input dataframe col : str column name. Returns ------- Tuple[pd.DataFrame, pd.Series] Valid X and y. Raises ------ ValueError _description_ """ X = df.drop(columns=col, errors="ignore") if self.handler_nan == "none": pass elif self.handler_nan == "row": X = X.loc[~X.isna().any(axis=1)] elif self.handler_nan == "column": X = X.dropna(how="any", axis=1) else: raise ValueError( f"Value '{self.handler_nan}' is not correct for argument `handler_nan'." ) # X = pd.get_dummies(X, prefix_sep="=") y = df.loc[X.index, col] return X, y
def _fit_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> Optional[BaseEstimator]: """Fit the imputer on `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Any Return a fitted regressor Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if col != "__all__": raise ValueError(f"col must be '__all__', but '{col}' has been passed.") cols_with_nans = df.columns[df.isna().any()] dict_estimators: Dict[str, BaseEstimator] = {} for col in cols_with_nans: # Selects only the valid values in the Train Set according # to the chosen method X, y = self.get_Xy_valid(df, col) # Selects only non-NaN values for the Test Set is_na = y.isna() X = X[~is_na] y = y[~is_na] # Train the model according to an ML or DL method and # after predict the imputation if not X.empty: estimator = copy.deepcopy(self.estimator) dict_estimators[col] = self._fit_estimator(estimator, X, y) else: dict_estimators[col] = None return dict_estimators def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if col != "__all__": raise ValueError(f"col must be '__all__', but '{col}' has been passed.") df_imputed = df.copy() cols_with_nans = df.columns[df.isna().any()] for col in cols_with_nans: model = self._dict_fitting["__all__"][ngroup][col] if model is None: continue # Define the Train and Test set X, y = self.get_Xy_valid(df, col) # Selects only non-NaN values for the Test Set is_na = y.isna() if not np.any(is_na): continue X = X.loc[is_na] y_hat = self._predict_estimator(model, X) y_hat.index = X.index df_imputed.loc[X.index, col] = y_hat return df_imputed
[docs]class ImputerRpcaPcp(_Imputer): """PCP RPCA imputer. This class implements the Robust Principal Component Analysis imputation with Principal Component Pursuit. The imputation minimizes a loss function combining a low-rank criterium on the dataframe and a L1 penalization on the residuals. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] columnwise : bool For the RPCA method to be applied columnwise (with reshaping of each column into an array) or to be applied directly on the dataframe. By default, the value is set to False. random_state : RandomSetting, optional Controls the randomness of the fit_transform, by default None """
[docs] def __init__( self, groups: Tuple[str, ...] = (), columnwise: bool = False, random_state: RandomSetting = None, period: int = 1, mu: Optional[float] = None, lam: Optional[float] = None, max_iterations: int = int(1e4), tolerance: float = 1e-6, verbose: bool = False, ) -> None: super().__init__( imputer_params=( "period", "mu", "lam", "max_iterations", "tolerance", ), groups=groups, columnwise=columnwise, random_state=random_state, ) self.period = period self.mu = mu self.lam = lam self.max_iterations = max_iterations self.tolerance = tolerance self.verbose = verbose
[docs] def get_model(self, **hyperparams) -> rpca_pcp.RpcaPcp: """Get the underlying model of the imputer based on its attributes. Returns ------- rpca.RPCA RPCA model to be used in the fit and transform methods. """ hyperparams = { key: hyperparams[key] for key in [ "mu", "lam", "max_iterations", "tolerance", ] } model = rpca_pcp.RpcaPcp(random_state=self._rng, verbose=self.verbose, **hyperparams) return model
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams() model = self.get_model(**hyperparams) X = df.astype(float).values D = utils.prepare_data(X, self.period) Omega = ~np.isnan(D) # D = utils.linear_interpolation(D) means = np.nanmean(D, axis=0) stds = np.nanstd(D, axis=0) stds = np.where(stds, stds, 1) D_scale = (D - means) / stds M, A = model.decompose(D_scale, Omega) M = M * stds + means M_final = utils.get_shape_original(M, X.shape) df_imputed = pd.DataFrame(M_final, index=df.index, columns=df.columns) df_imputed = df.where(~df.isna(), df_imputed) return df_imputed
[docs]class ImputerRpcaNoisy(_Imputer): """Noise RPCA imputer. This class implements the Robust Principal Component Analysis imputation with added noise. The imputation minimizes a loss function combining a low-rank criterium on the dataframe and a L1 penalization on the residuals. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] columnwise : bool For the RPCA method to be applied columnwise (with reshaping of each column into an array) or to be applied directly on the dataframe. By default, the value is set to False. random_state : RandomSetting, optional Controls the randomness of the fit_transform, by default None """
[docs] def __init__( self, groups: Tuple[str, ...] = (), columnwise: bool = False, random_state: RandomSetting = None, period: int = 1, mu: Optional[float] = None, rank: Optional[int] = None, tau: Optional[float] = None, lam: Optional[float] = None, list_periods: Tuple[int, ...] = (), list_etas: Tuple[float, ...] = (), max_iterations: int = int(1e4), tolerance: float = 1e-6, norm: Optional[str] = "L2", verbose: bool = False, ) -> None: super().__init__( imputer_params=( "period", "mu", "rank", "tau", "lam", "list_periods", "list_etas", "max_iterations", "tolerance", "norm", ), groups=groups, columnwise=columnwise, random_state=random_state, ) self.period = period self.mu = mu self.rank = rank self.tau = tau self.lam = lam self.list_periods = list_periods self.list_etas = list_etas self.max_iterations = max_iterations self.tolerance = tolerance self.norm = norm self.verbose = verbose
[docs] def get_model(self, **hyperparams) -> rpca_noisy.RpcaNoisy: """Get the underlying model of the imputer based on its attributes. Returns ------- rpca.RPCA RPCA model to be used in the fit and transform methods. """ hyperparams = { key: hyperparams[key] for key in [ "rank", "tau", "lam", "list_periods", "list_etas", "max_iterations", "tolerance", "norm", ] } model = rpca_noisy.RpcaNoisy(random_state=self._rng, verbose=self.verbose, **hyperparams) return model
def _fit_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> Tuple[NDArray, NDArray, NDArray]: """Fit the imputer on `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Tuple A tuple made of: - the reduced decomposition basis - the estimated mean of the columns - the estimated standard deviation of the columns Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams() model = self.get_model(**hyperparams) X = df.astype(float).values D = utils.prepare_data(X, self.period) Omega = ~np.isnan(D) # D = utils.linear_interpolation(D) means = np.nanmean(D, axis=0) stds = np.nanstd(D, axis=0) stds = np.where(stds, stds, 1) D_scale = (D - means) / stds _, _, _, Q = model.decompose_with_basis(D_scale, Omega) return Q, means, stds def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams() model = self.get_model(**hyperparams) X = df.astype(float).values D = utils.prepare_data(X, self.period) Omega = ~np.isnan(D) # D = utils.linear_interpolation(D) Q, means, stds = self._dict_fitting[col][ngroup] D_scale = (D - means) / stds M, A = model.decompose_on_basis(D_scale, Omega, Q) M = M * stds + means M_final = utils.get_shape_original(M, X.shape) df_imputed = pd.DataFrame(M_final, index=df.index, columns=df.columns) df_imputed = df.where(~df.isna(), df_imputed) return df_imputed
[docs]class ImputerSoftImpute(_Imputer): """SoftImpute imputer. This class implements the Soft Impute method: Hastie, Trevor, et al. Matrix completion and low-rank SVD via fast alternating least squares. The Journal of Machine Learning Research 16.1 (2015): 3367-3402. This imputation technique is less robust than the RPCA, although it can provide faster. Parameters ---------- groups: Tuple[str, ...] List of column names to group by, by default [] columnwise : bool For the RPCA method to be applied columnwise (with reshaping of each column into an array) or to be applied directly on the dataframe. By default, the value is set to False. random_state : RandomSetting, optional Controls the randomness of the fit_transform, by default None """
[docs] def __init__( self, groups: Tuple[str, ...] = (), columnwise: bool = False, random_state: RandomSetting = None, period: int = 1, rank: Optional[int] = None, tolerance: float = 1e-05, tau: Optional[float] = None, max_iterations: int = 100, verbose: bool = False, ): super().__init__( imputer_params=( "period", "rank", "tolerance", "tau", "max_iterations", "verbose", ), groups=groups, columnwise=columnwise, random_state=random_state, ) self.period = period self.rank = rank self.tolerance = tolerance self.tau = tau self.max_iterations = max_iterations self.verbose = verbose
[docs] def get_model(self, **hyperparams) -> softimpute.SoftImpute: """Get the underlying model of the imputer based on its attributes. Returns ------- softimpute.SoftImpute Soft Impute model to be used in the transform method. """ hyperparams = { key: hyperparams[key] for key in [ "tau", "max_iterations", "tolerance", ] } model = softimpute.SoftImpute(random_state=self._rng, verbose=self.verbose, **hyperparams) return model
def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams() model = self.get_model(**hyperparams) X = df.astype(float).values D = utils.prepare_data(X, self.period) Omega = ~np.isnan(D) M, A = model.decompose(D, Omega) M_final = utils.get_shape_original(M, X.shape) A_final = utils.get_shape_original(A, X.shape) X_imputed = M_final + A_final df_imputed = pd.DataFrame(X_imputed, index=df.index, columns=df.columns) df_imputed = df.where(~df.isna(), df_imputed) return df_imputed
[docs]class ImputerEM(_Imputer): """EM imputer. This class implements an imputation method based on joint modelling and an inference using a Expectation-Minimization algorithm. Parameters ---------- groups : Tuple[str, ...], default=() List of column names to group by. model : {'multinormal', 'VAR'}, default='multinormal' Method defining the hypothesis made on the data distribution. Possible values: - 'multinormal' : the data points are independent and uniformly distributed following a multinormal distribution - 'VAR' : the data is a time series modeled by a VAR(p) process columnwise : bool, default=False If False, correlations between variables will be used, which is advised. If True, each column is imputed independently. For the multinormal case each value will be imputed by the mean up to a noise with fixed noise, for the VAR case the imputation will be a noisy temporal interpolation. random_state : RandomSetting, optional Controls the randomness of the fit_transform, by default None method : {'mle', 'sample'}, default='sample' Imputation method after EM convergence. - 'mle' : Maximum Likelihood Estimation - 'sample' : Sample from the posterior distribution max_iter_em : int, default=200 Maximum number of EM iterations. n_iter_ou : int, default=50 Number of Ornstein-Uhlenbeck process iterations for sampling. ampli : float, default=1 Amplitude parameter for the Ornstein-Uhlenbeck process. dt : float, default=0.02 Time step for the Ornstein-Uhlenbeck process discretization. tolerance : float, default=1e-4 Convergence tolerance for EM algorithm. stagnation_threshold : float, default=5e-3 Threshold for element-wise stagnation detection in EM algorithm. stagnation_loglik : float, default=2 Threshold for log-likelihood stagnation in EM algorithm. period : int, default=1 If different from 1, the data is folded with respect to the given period before applying the imputation. verbose : bool, default=False If True, print convergence information during fitting. p : int, optional Order of the VAR process (only used when model='VAR'), by default None """
[docs] def __init__( self, groups: Tuple[str, ...] = (), model: Optional[str] = "multinormal", columnwise: bool = False, random_state: RandomSetting = None, method: Literal["mle", "sample"] = "sample", max_iter_em: int = 200, n_iter_ou: int = 50, ampli: float = 1, dt: float = 2e-2, tolerance: float = 1e-4, stagnation_threshold: float = 5e-3, stagnation_loglik: float = 2, period: int = 1, verbose: bool = False, p: Union[None, int] = None, ): super().__init__( imputer_params=( "max_iter_em", "n_iter_ou", "ampli", "dt", "tolerance", "stagnation_threshold", "stagnation_loglik", "period", "p", ), groups=groups, columnwise=columnwise, random_state=random_state, ) self.model = model self.method = method self.max_iter_em = max_iter_em self.n_iter_ou = n_iter_ou self.ampli = ampli self.dt = dt self.tolerance = tolerance self.stagnation_threshold = stagnation_threshold self.stagnation_loglik = stagnation_loglik self.period = period self.verbose = verbose self.p = p
[docs] def get_model(self, **hyperparams) -> em_sampler.EM: """Get the underlying model of the imputer based on its attributes. Returns ------- em_sampler.EM EM model to be used in the fit and transform methods. """ if self.model == "multinormal": hyperparams.pop("p") return em_sampler.MultiNormalEM( random_state=self.random_state, method=self.method, verbose=self.verbose, **hyperparams, ) elif self.model == "VAR": hyperparams["p"] = self.p return em_sampler.VARpEM( random_state=self.random_state, method=self.method, verbose=self.verbose, **(hyperparams), # type: ignore #noqa ) else: raise ValueError( f"Model argument `{self.model}` is invalid!" " Valid values are `multinormal`and `VAR`." )
def _fit_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> em_sampler.EM: """Fit the imputer on `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Any Return fitted EM model Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) hyperparams = self.get_hyperparams() model = self.get_model(**hyperparams) model = model.fit(df.values) return model def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe. Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if df.notna().all().all(): return df model = self._dict_fitting[col][ngroup] X = df.values.astype(float) X_imputed = model.transform(X) df_transformed = pd.DataFrame(X_imputed, columns=df.columns, index=df.index) return df_transformed