Source code for qolmat.imputations.imputers_pytorch

"""Script for pytroch imputers."""

import logging
from copy import copy
from typing import Any, Callable, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from numpy.typing import NDArray
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm

# from typing_extensions import Self
from qolmat.benchmark import metrics
from qolmat.imputations.diffusions import ddpms
from qolmat.imputations.imputers import ImputerRegressor, _Imputer
from qolmat.utils.exceptions import (
    EstimatorNotDefined,
    PyTorchExtraNotInstalled,
)
from qolmat.utils.utils import RandomSetting

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
except ModuleNotFoundError:
    raise PyTorchExtraNotInstalled


logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
)


class ImputerRegressorPyTorch(ImputerRegressor):
    """Imputer regressor based on PyTorch.

    This class inherits from the class ImputerRegressor
    and allows for PyTorch regressors.

    Parameters
    ----------
    groups: Tuple[str, ...]
        List of column names to group by, by default []
    estimator : torch.nn.Sequential, optional
        PyTorch estimator for imputing a column based on the others
    handler_nan : str
        Can be `fit, `row` or `column`:
        - if `fit`, the estimator is assumed to be fitted on parcelar data,
        - if `row` all non complete rows will be removed from the train
        dataset, and will not be used for the inference,
        - if `column`all non complete columns will be ignored.
        By default, `row`
    epochs: int
        Number of epochs when fitting the autoencoder, by default 100
    learning_rate: float
        Learning rate hen fitting the autoencoder, by default 0.001
    loss_fn: Callable
        Loss used when fitting the autoencoder, by default nn.L1Loss()

    """

    def __init__(
        self,
        groups: Tuple[str, ...] = (),
        estimator: Optional[nn.Sequential] = None,
        handler_nan: str = "row",
        epochs: int = 100,
        learning_rate: float = 0.001,
        loss_fn: Callable = nn.L1Loss(),
    ):
        super().__init__(
            imputer_params=("handler_nan", "epochs", "monitor", "patience"),
            groups=groups,
            handler_nan=handler_nan,
        )
        self.epochs = epochs
        self.learning_rate = learning_rate
        self.loss_fn = loss_fn
        self.estimator = estimator

    def _fit_estimator(self, estimator: nn.Sequential, X: pd.DataFrame, y: pd.DataFrame) -> Any:
        """Fit the PyTorch estimator using the provided input and target data.

        Parameters
        ----------
        estimator: torch.nn.Sequential
            PyTorch estimator for imputing a column based on the others.
        X : pd.DataFrame
            The input data for training.
        y : pd.DataFrame
            The target data for training.

        Returns
        -------
        Any
            Return fitted PyTorch estimator.

        """
        if not estimator:
            raise EstimatorNotDefined()
        optimizer = optim.Adam(estimator.parameters(), lr=self.learning_rate)
        loss_fn = self.loss_fn

        with tqdm(total=self.epochs, desc="Training", unit="epoch") as pbar:
            for _ in range(self.epochs):
                estimator.train()
                optimizer.zero_grad()

                input_data = torch.Tensor(X.values)
                target_data = torch.Tensor(y.values)
                target_data = target_data.unsqueeze(1)
                outputs = estimator(input_data)
                loss = loss_fn(outputs, target_data)

                loss.backward()
                optimizer.step()
                pbar.set_postfix(loss=f"{loss.item():.4f}")
                pbar.update(1)
        return estimator

    def _predict_estimator(self, estimator: nn.Sequential, X: pd.DataFrame) -> pd.Series:
        """Perform predictions using the trained PyTorch estimator.

        Parameters
        ----------
        estimator: torch.nn.Sequential
            PyTorch estimator for imputing a column based on the others.
        X : pd.DataFrame
            The input data for prediction.

        Returns
        -------
        pd.Series
            The predicted values.

        Raises
        ------
        EstimatorNotDefined
            Raises an error if the attribute estimator is not defined.

        """
        if not estimator:
            raise EstimatorNotDefined()
        input_data = torch.Tensor(X.values)
        output_data = estimator(input_data)
        y = pd.Series(output_data.detach().numpy().flatten())
        return y


class Autoencoder(nn.Module):
    """Wrapper of a PyTorch autoencoder allowing to encode.

    Parameters
    ----------
    encoder : nn.Sequential
        The encoder module.
    decoder : nn.Sequential
        The decoder module.
    epochs : int, optional
        Number of epochs for training, by default 100.
    learning_rate : float, optional
        Learning rate for optimization, by default 0.001.
    loss_fn : Callable, optional
        Loss function for training, by default nn.L1Loss().

    """

    def __init__(
        self,
        encoder: nn.Sequential,
        decoder: nn.Sequential,
        epochs: int = 100,
        learning_rate: float = 0.001,
        loss_fn: Callable = nn.L1Loss(),
    ):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.loss_fn = loss_fn
        self.epochs = epochs
        self.learning_rate = learning_rate
        self.loss: List[List[float]] = []
        self.scaler = StandardScaler()

    def forward(self, x: NDArray) -> nn.Sequential:
        """Forward pass through the autoencoder.

        Parameters
        ----------
        x : pd.DataFrame
            Input data.

        Returns
        -------
        pd.DataFrame
            Decoded data.

        """
        encode = self.encoder(x)
        decode = self.decoder(encode)
        return decode

    def fit(self, X: NDArray, y: NDArray) -> "Autoencoder":
        """Fit the autoencoder to the data.

        Parameters
        ----------
        X : ndarray
            Input data for training.
        y : ndarray
            Target data for training.

        Returns
        -------
        Self
            Return Self

        """
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        loss_fn = self.loss_fn
        list_loss = []
        for epoch in range(self.epochs):
            self.train()
            optimizer.zero_grad()

            input_data = torch.Tensor(X)
            target_data = torch.Tensor(y)
            outputs = self(input_data)
            loss = loss_fn(outputs, target_data)

            loss.backward()
            optimizer.step()
            if (epoch + 1) % 10 == 0:
                logging.info(f"Epoch [{epoch + 1}/{self.epochs}], Loss: {loss.item():.4f}")
            list_loss.append(loss.item())
        self.loss.extend([list_loss])
        return self

    def decode(self, Z: NDArray) -> NDArray:
        """Decode encoded data.

        Parameters
        ----------
        Z : ndarray
            Encoded data.

        Returns
        -------
        ndarray
            Decoded data.

        """
        Z_decoded = self.scaler.inverse_transform(Z)
        Z_decoded = self.decoder(torch.Tensor(Z_decoded))
        Z_decoded = Z_decoded.detach().numpy()
        return Z_decoded

    def encode(self, X: NDArray) -> NDArray:
        """Encode input data.

        Parameters
        ----------
        X : ndarray
            Input data.

        Returns
        -------
        ndarray
            Encoded data.

        """
        X_encoded = self.encoder(torch.Tensor(X))
        X_encoded = X_encoded.detach().numpy()
        X_encoded = self.scaler.fit_transform(X_encoded)
        return X_encoded


class ImputerAutoencoder(_Imputer):
    """Impute by the mean of the column.

    Parameters
    ----------
    groups: Tuple[str, ...]
        List of column names to group by, by default []
    lamb: float
        Sampling step
    max_iterations: int
        Maximal number of iterations in the sampling process
    epochs: int
        Number of epochs when fitting the autoencoder, by default 100
    learning_rate: float
        Learning rate hen fitting the autoencoder, by default 0.001
    loss_fn: Callable
        Loss used when fitting the autoencoder, by default nn.L1Loss()

    """

    def __init__(
        self,
        encoder: nn.Sequential,
        decoder: nn.Sequential,
        groups: Tuple[str, ...] = (),
        random_state: RandomSetting = None,
        lamb: float = 1e-2,
        max_iterations: int = 100,
        epochs: int = 100,
        learning_rate: float = 0.001,
        loss_fn: Callable = nn.L1Loss(),
    ) -> None:
        super().__init__(
            groups=groups,
            columnwise=False,
            shrink=False,
            random_state=random_state,
        )
        self.loss_fn = loss_fn
        self.lamb = lamb
        self.max_iterations = max_iterations
        self.epochs = epochs
        self.learning_rate = learning_rate
        self.encoder = encoder
        self.decoder = decoder

    def _fit_element(self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0) -> Autoencoder:
        """Fit the imputer on `df`.

        It does that at the group and/or column level depending onself.groups
        and self.columnwise.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe on which the imputer is fitted
        col : str, optional
            Column on which the imputer is fitted, by default "__all__"
        ngroup : int, optional
            ID of the group on which the method is applied

        Returns
        -------
        Any
            Return fitted encoder

        Raises
        ------
        NotDataFrame
            Input has to be a pandas.DataFrame.

        """
        self._check_dataframe(df)
        autoencoder = Autoencoder(
            self.encoder,
            self.decoder,
            self.epochs,
            self.learning_rate,
            self.loss_fn,
        )
        X = df.fillna(df.mean()).values
        return autoencoder.fit(X, X)

    def _transform_element(
        self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0
    ) -> pd.DataFrame:
        """Transform the dataframe `df`.

        It does that at the group and/or column level depending onself.groups
        and self.columnwise.

        Parameters
        ----------
        df : pd.DataFrame
            Dataframe or column to impute
        col : str, optional
            Column transformed by the imputer, by default "__all__"
        ngroup : int, optional
            ID of the group on which the method is applied

        Returns
        -------
        pd.DataFrame
            Imputed dataframe

        Raises
        ------
        NotDataFrame
            Input has to be a pandas.DataFrame.

        """
        autoencoder = self._dict_fitting[col][ngroup]
        df_train = df.copy()
        df_train = df_train.fillna(df_train.mean())
        scaler = StandardScaler()
        df_train_scaler = pd.DataFrame(
            scaler.fit_transform(df_train),
            index=df_train.index,
            columns=df_train.columns,
        )
        X = df_train_scaler.values
        mask = df.isna().values
        for _ in range(self.max_iterations):
            self.fit(X, X)
            Z = autoencoder.encode(X)
            W = np.sqrt(self.lamb) * self._rng.normal(0, 1, size=Z.shape)
            Z_next = (1 - self.lamb) * Z + W
            X_next = autoencoder.decode(Z_next)
            X[mask] = X_next[mask]
        df_imputed = pd.DataFrame(
            scaler.inverse_transform(X),
            index=df_train.index,
            columns=df_train.columns,
        )
        return df_imputed


def build_mlp(
    input_dim: int,
    list_num_neurons: List[int],
    output_dim: int = 1,
    activation: Callable = nn.ReLU,
) -> nn.Sequential:
    """Construct a multi-layer perceptron (MLP) with a custom architecture.

    Parameters
    ----------
    input_dim : int
        Dimension of the input layer.
    list_num_neurons : List[int]
        List specifying the number of neurons in each hidden layer.
    output_dim : int, optional
        Dimension of the output layer, defaults to 1.
    activation : nn.Module, optional
        Activation function to use between hidden layers,
        defaults to nn.ReLU().

    Returns
    -------
    nn.Sequential
        PyTorch model representing the MLP.

    Raises
    ------
    TypeError
        If `input_dim` is not an integer or `list_num_neurons` is not a list.

    Examples
    --------
    >>> model = build_mlp(input_dim=10, list_num_neurons=[32, 64, 128], output_dim=1)
    >>> print(model)
    Sequential(
      (0): Linear(in_features=10, out_features=32, bias=True)
      (1): ReLU()
      (2): Linear(in_features=32, out_features=64, bias=True)
      (3): ReLU()
      (4): Linear(in_features=64, out_features=128, bias=True)
      (5): ReLU()
      (6): Linear(in_features=128, out_features=1, bias=True)
    )

    """
    layers = []
    for num_neurons in list_num_neurons:
        layers.append(nn.Linear(input_dim, num_neurons))
        layers.append(activation())
        input_dim = num_neurons
    layers.append(nn.Linear(input_dim, output_dim))

    estimator = nn.Sequential(*layers)
    return estimator


def build_autoencoder(
    input_dim: int,
    latent_dim: int,
    list_num_neurons: List[int],
    output_dim: int = 1,
    activation: Callable = nn.ReLU,
) -> Tuple[nn.Sequential, nn.Sequential]:
    """Construct an autoencoder with a custom architecture.

    Parameters
    ----------
    input_dim : int
        Dimension of the input layer.
    latent_dim : int
        Dimension of the latent space.
    list_num_neurons : List[int]
        List specifying the number of neurons in each hidden layer.
    output_dim : int, optional
        Dimension of the output layer, defaults to 1.
    activation : nn.Module, optional
        Activation function to use between hidden layers,
        defaults to nn.ReLU().

    Returns
    -------
    Tuple[nn.Sequential, nn.Sequential]
        Tuple containing the encoder and decoder models.

    Raises
    ------
    TypeError
        If `input_dim` is not an integer or `list_num_neurons` is not a list.

    Examples
    --------
    >>> encoder, decoder = build_autoencoder(
    ...     input_dim=10,
    ...     latent_dim=4,
    ...     list_num_neurons=[32, 64, 128],
    ...     output_dim=252,
    ... )
    >>> print(encoder)
    Sequential(
      (0): Linear(in_features=10, out_features=128, bias=True)
      (1): ReLU()
      (2): Linear(in_features=128, out_features=64, bias=True)
      (3): ReLU()
      (4): Linear(in_features=64, out_features=32, bias=True)
      (5): ReLU()
      (6): Linear(in_features=32, out_features=4, bias=True)
    )
    >>> print(decoder)
    Sequential(
      (0): Linear(in_features=4, out_features=32, bias=True)
      (1): ReLU()
      (2): Linear(in_features=32, out_features=64, bias=True)
      (3): ReLU()
      (4): Linear(in_features=64, out_features=128, bias=True)
      (5): ReLU()
      (6): Linear(in_features=128, out_features=252, bias=True)
    )

    """
    encoder = build_mlp(
        input_dim=input_dim,
        output_dim=latent_dim,
        list_num_neurons=np.sort(list_num_neurons)[::-1].tolist(),
        activation=activation,
    )
    decoder = build_mlp(
        input_dim=latent_dim,
        output_dim=output_dim,
        list_num_neurons=np.sort(list_num_neurons).tolist(),
        activation=activation,
    )
    return encoder, decoder


[docs]class ImputerDiffusion(_Imputer): """Imputer based on diffusion models. This class inherits from the class _Imputer. It is a wrapper for imputers based on diffusion models. """
[docs] def __init__( self, model: str = "TabDDPM", groups: Tuple[str, ...] = (), epochs: int = 100, batch_size: int = 100, x_valid: pd.DataFrame = None, print_valid: bool = False, metrics_valid: Tuple[Callable, ...] = ( metrics.mean_absolute_error, metrics.dist_wasserstein, ), round: int = 10, cols_imputed: Tuple[str, ...] = (), index_datetime: str = "", freq_str: str = "1D", random_state: RandomSetting = None, # Model parameters num_noise_steps: int = 50, beta_start: float = 1e-4, beta_end: float = 0.02, lr: float = 0.001, ratio_masked: float = 0.1, dim_embedding: int = 128, dim_feedforward: int = 64, num_blocks: int = 1, nheads_feature: int = 5, nheads_time: int = 8, num_layers_transformer: int = 1, p_dropout: float = 0.0, num_sampling: int = 1, is_rolling: bool = False, ): """Init ImputerDiffusion. Parameters ---------- groups : Tuple[str, ...], optional List of column names to group by, by default () model : str Name of the imputer based on diffusion models (e.g., TabDDPM, TsDDPM), by default `TabDDPM` epochs : int, optional Number of epochs, by default 10 batch_size : int, optional Batch size, by default 100 x_valid : pd.DataFrame, optional Dataframe for validation, by default None print_valid : bool, optional Print model performance for after several epochs, by default False metrics_valid : Tuple[Callable, ...], optional Set of validation metrics, by default (metrics.mean_absolute_error, metrics.dist_wasserstein) round : int, optional Number of decimal places to round to, for better displaying model performance, by default 10 cols_imputed : Tuple[str, ...], optional Name of columns that need to be imputed, by default () index_datetime : str Name of datetime-like index. It is for processing time-series data, used in diffusion models e.g., TsDDPM. freq_str : str Frequency string of DateOffset of Pandas. It is for processing time-series data, used in diffusion models e.g., TsDDPM. random_state : RandomSetting, optional Controls the randomness of the fit_transform, by default None num_noise_steps : int, optional Number of noise steps, by default 50 beta_start : float, optional Range of beta (noise scale value), by default 1e-4 beta_end : float, optional Range of beta (noise scale value), by default 0.02 lr : float, optional Learning rate, by default 0.001 ratio_masked : float, optional Ratio of artificial nan for training and validation, by default 0.1 dim_embedding : int, optional Embedding dimension, by default 128 dim_feedforward : int, optional Feedforward layer dimension in Transformers, by default 64 num_blocks : int, optional Number of residual blocks, by default 1 nheads_feature : int, optional Number of heads to encode feature-based context, by default 5 nheads_time : int, optional Number of heads to encode time-based context, by default 8 num_layers_transformer : int, optional Number of transformer layer, by default 1 p_dropout : float, optional Dropout probability, by default 0.0 num_sampling : int, optional Number of samples generated for each cell, by default 1 is_rolling : bool, optional Use pandas.DataFrame.rolling for preprocessing data, by default False Examples -------- >>> import numpy as np >>> from qolmat.imputations.imputers_pytorch import ImputerDiffusion >>> >>> X = np.array( ... [ ... [1, 1, 1, 1], ... [np.nan, np.nan, 3, 2], ... [1, 2, 2, 1], ... [2, 2, 2, 2], ... ] ... ) >>> imputer = ImputerDiffusion(epochs=50, batch_size=1, random_state=11) >>> >>> df_imputed = imputer.fit_transform(X) """ super().__init__(groups=groups, columnwise=False) self.model = model self.epochs = epochs self.batch_size = batch_size self.x_valid = x_valid self.print_valid = print_valid self.metrics_valid = metrics_valid self.round = round self.cols_imputed = cols_imputed self.index_datetime = index_datetime self.freq_str = freq_str self.random_state = random_state self.num_noise_steps = num_noise_steps self.beta_start = beta_start self.beta_end = beta_end self.lr = lr self.ratio_masked = ratio_masked self.dim_embedding = dim_embedding self.dim_feedforward = dim_feedforward self.num_blocks = num_blocks self.nheads_feature = nheads_feature self.nheads_time = nheads_time self.num_layers_transformer = num_layers_transformer self.p_dropout = p_dropout self.num_sampling = num_sampling self.is_rolling = is_rolling
[docs] def get_model(self) -> ddpms.TabDDPM: """Get the underlying model of the imputer based on its attributes. Returns ------- ddpms.TabDDPM TabDDPM model to be used in the fit and transform methods. """ params_model = self.get_params_model() if self.model == "TabDDPM": return ddpms.TabDDPM( random_state=self.random_state, **params_model, ) elif self.model == "TsDDPM": return ddpms.TsDDPM( random_state=self.random_state, **params_model, # type: ignore #noqa ) else: raise ValueError( f"Model argument `{self.model}` is invalid!" " Valid values are `TabDDPM`and `TsDDPM`." )
[docs] def get_params_model(self) -> dict: """Get parameters for creating a DDPM model. Returns ------- dict A dictionary containing the parameters required to create a model of type TabDDPM or TsDDPM. """ list_params = [ "num_noise_steps", "beta_start", "beta_end", "lr", "ratio_masked", "dim_embedding", "num_blocks", "p_dropout", "num_sampling", ] if self.model == "TabDDPM": list_params += ["is_clip"] elif self.model == "TsDDPM": list_params += [ "dim_feedforward", "nheads_feature", "nheads_time", "num_layers_transformer", "is_rolling", ] dict_params = {key: value for key, value in self.__dict__.items() if key in list_params} return dict_params
def _fit_element(self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0): """Fit the imputer on `df`. It does it at the group and/or column level depending onself.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe on which the imputer is fitted col : str, optional Column on which the imputer is fitted, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- Any Return fitted model Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) model = self.get_model() hp_fit = self._get_params_fit() model = model.fit(df, **hp_fit) self._model_fitted = copy(model) return model def _transform_element( self, df: pd.DataFrame, col: str = "__all__", ngroup: int = 0 ) -> pd.DataFrame: """Transform the dataframe `df`. It does it at the group and/or column level depending on self.groups and self.columnwise. Parameters ---------- df : pd.DataFrame Dataframe or column to impute col : str, optional Column transformed by the imputer, by default "__all__" ngroup : int, optional ID of the group on which the method is applied Returns ------- pd.DataFrame Imputed dataframe Raises ------ NotDataFrame Input has to be a pandas.DataFrame. """ self._check_dataframe(df) if df.notna().all().all(): return df model = self._dict_fitting[col][ngroup] df_imputed = model.predict(df) return df_imputed def _get_params_fit(self) -> Dict: hyperparams = { "epochs": self.epochs, "batch_size": self.batch_size, "x_valid": self.x_valid, "print_valid": self.print_valid, "metrics_valid": self.metrics_valid, "round": self.round, "cols_imputed": self.cols_imputed, } if self.index_datetime != "": hyperparams = { **hyperparams, **{ "index_datetime": self.index_datetime, "freq_str": self.freq_str, }, } return hyperparams
[docs] def get_summary_training(self) -> Dict: """Get the summary of the training. Returns ------- Dict Summary of the training """ model = self._model_fitted return model.summary
[docs] def get_summary_architecture(self) -> Dict: """Get the summary of the architecture. Returns ------- Dict Summary of the architecture """ model = self._model_fitted eps_model = model._get_eps_model() return { "number_parameters": model.get_num_params(), "epsilon_model": eps_model, }