"""Script for DDPM classes."""
import logging
import time
from datetime import timedelta
from typing import Any, Callable, Dict, List, Tuple
import numpy as np
import pandas as pd
import torch
from sklearn import preprocessing
from sklearn import utils as sku
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
# from typing_extensions import Self
from qolmat.benchmark import metrics, missing_patterns
from qolmat.imputations.diffusions.base import (
AutoEncoder,
ResidualBlock,
ResidualBlockTS,
)
from qolmat.utils.utils import RandomSetting
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
)
[docs]class TabDDPM:
"""Tab DDPM.
Diffusion model for tabular data based on
Denoising Diffusion Probabilistic Models (DDPM) of
Ho et al., 2020 (https://arxiv.org/abs/2006.11239),
Tashiro et al., 2021 (https://arxiv.org/abs/2107.03502).
This implementation follows the implementations found in
https://github.com/quickgrid/pytorch-diffusion/tree/main,
https://github.com/ermongroup/CSDI/tree/main
"""
[docs] def __init__(
self,
num_noise_steps: int = 50,
beta_start: float = 1e-4,
beta_end: float = 0.02,
lr: float = 0.001,
ratio_masked: float = 0.1,
dim_embedding: int = 128,
num_blocks: int = 1,
p_dropout: float = 0.0,
num_sampling: int = 1,
is_clip: bool = True,
random_state: RandomSetting = None,
):
"""Init function.
Parameters
----------
num_noise_steps : int, optional
Number of noise steps, by default 50
beta_start : float, optional
Range of beta (noise scale value), by default 1e-4
beta_end : float, optional
Range of beta (noise scale value), by default 0.02
lr : float, optional
Learning rate, by default 0.001
ratio_masked : float, optional
Ratio of artificial nan for training and validation, by default 0.1
dim_embedding : int, optional
Embedding dimension, by default 128
num_blocks : int, optional
Number of residual block in epsilon model, by default 1
p_dropout : float, optional
Dropout probability, by default 0.0
num_sampling : int, optional
Number of samples generated for each cell, by default 1
is_clip : bool, optional
if values have to be clipped, by default True
random_state : int, RandomState instance or None, default=None
Controls the randomness.
Pass an int for reproducible output across multiple function calls.
"""
self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
# Hyper-parameters for DDPM
# Section 2, equation 1, num_noise_steps is T.
self.num_noise_steps = num_noise_steps
# Section 2, equation 4 and near explanation for alpha, alpha hat, beta.
self.beta_start = beta_start
self.beta_end = beta_end
self.beta = torch.linspace(
start=self.beta_start,
end=self.beta_end,
steps=self.num_noise_steps,
device=self.device,
) # Linear noise schedule
self.alpha = 1 - self.beta
self.alpha_hat = torch.cumprod(self.alpha, dim=0)
# Section 3.2, algorithm 1 formula implementation.
# Generate values early reuse later.
self.sqrt_alpha_hat = torch.sqrt(self.alpha_hat)
self.sqrt_one_minus_alpha_hat = torch.sqrt(1 - self.alpha_hat)
# Section 3.2, equation 2 precalculation values.
self.sqrt_alpha = torch.sqrt(self.alpha)
self.std_beta = torch.sqrt(self.beta)
# Hyper-parameters for building and training the model
self.loss_func = torch.nn.MSELoss(reduction="none")
self.lr = lr
self.ratio_masked = ratio_masked
self.num_noise_steps = num_noise_steps
self.dim_embedding = dim_embedding
self.num_blocks = num_blocks
self.p_dropout = p_dropout
self.num_sampling = num_sampling
self.is_clip = is_clip
self.normalizer_x = preprocessing.StandardScaler()
self.random_state = sku.check_random_state(random_state)
seed_torch = self.random_state.randint(2**31 - 1)
torch.manual_seed(seed_torch)
def __getstate__(self) -> dict[str, Any]:
"""Hashing method used in sklearn check tests.
Returns
-------
________
str
Hashed object containing the underlying model weights
"""
state = self.__dict__.copy()
if "optimiser" in state:
state.pop("optimiser")
return state
def _q_sample(self, x: torch.Tensor, t: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Sample q.
Section 3.2, algorithm 1 formula implementation. Forward process,
defined by `q`. Found in section 2. `q` gradually adds gaussian noise
according to variance schedule. Also, can be seen on figure 2.
Ho et al., 2020 (https://arxiv.org/abs/2006.11239)
Parameters
----------
x : torch.Tensor
Data input
t : torch.Tensor
Noise step
Returns
-------
Tuple[torch.Tensor, torch.Tensor]
Noised data at noise step t
"""
sqrt_alpha_hat = self.sqrt_alpha_hat[t].view(-1, 1)
sqrt_one_minus_alpha_hat = self.sqrt_one_minus_alpha_hat[t].view(-1, 1)
epsilon = torch.randn_like(x, device=self.device)
return sqrt_alpha_hat * x + sqrt_one_minus_alpha_hat * epsilon, epsilon
def _get_eps_model(self) -> AutoEncoder:
model = AutoEncoder(
num_noise_steps=self.num_noise_steps,
dim_input=self.dim_input,
residual_block=ResidualBlock(self.dim_embedding, self.dim_embedding, self.p_dropout),
dim_embedding=self.dim_embedding,
num_blocks=self.num_blocks,
p_dropout=self.p_dropout,
)
return model
def _set_eps_model(self) -> None:
model = self._get_eps_model()
self._eps_model = model.to(self.device)
self.optimiser = torch.optim.Adam(self._eps_model.parameters(), lr=self.lr)
[docs] def get_num_params(self) -> int:
"""Compute the number of parameters of the underlying model.
Returns
-------
int: Number of parameters if the model has been fitted,
0 otherwise.
"""
if hasattr(self, "_eps_model"):
model_parameters = filter(lambda p: p.requires_grad, self._eps_model.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
return int(params)
else:
return 0
def _print_valid(self, epoch: int, time_duration: float) -> None:
"""Print model performance on validation data.
Parameters
----------
epoch : int
Epoch of the printed performance
time_duration : float
Duration for training step
"""
self.time_durations.append(time_duration)
print_step = 1 if int(self.epochs / 10) == 0 else int(self.epochs / 10)
if self.print_valid and epoch == 0:
n_params = self.get_num_params()
logging.info(f"Num params of {self.__class__.__name__}: {n_params}")
if self.print_valid and epoch % print_step == 0:
string_valid = f"Epoch {epoch}: "
for s in self.summary:
string_valid += f" {s}={round(self.summary[s][epoch], self.round)}"
# string_valid += f" | in {round(time_duration, 3)} secs"
remaining_duration = np.mean(self.time_durations) * (self.epochs - epoch)
string_valid += f" | remaining {timedelta(seconds=remaining_duration)}"
logging.info(string_valid)
def _impute(self, x: np.ndarray, x_mask_obs: np.ndarray) -> np.ndarray:
"""Impute data array.
Parameters
----------
x : np.ndarray
Input data
x_mask_obs : np.ndarray
Observed value mask
Returns
-------
np.ndarray
Imputed data
"""
x_tensor = torch.from_numpy(x).float().to(self.device)
x_mask_tensor = torch.from_numpy(x_mask_obs).float().to(self.device)
dataloader = DataLoader(
TensorDataset(x_tensor, x_mask_tensor),
batch_size=self.batch_size,
drop_last=False,
shuffle=False,
)
with torch.no_grad():
outputs = []
for id_batch, (x_batch, mask_x_batch) in enumerate(dataloader):
noise = torch.randn(x_batch.size(), device=self.device)
for i in reversed(range(1, self.num_noise_steps)):
t = (
torch.ones(
(x_batch.size(dim=0), 1),
dtype=torch.long,
device=self.device,
)
* i
)
if len(x_batch.size()) == 3:
# Data are split into chunks
# (i.e., Time-series data),
# a window of rows
# is processed.
sqrt_alpha_t = self.sqrt_alpha[t].view(-1, 1, 1)
beta_t = self.beta[t].view(-1, 1, 1)
sqrt_one_minus_alpha_hat_t = self.sqrt_one_minus_alpha_hat[t].view(
-1, 1, 1
)
epsilon_t = self.std_beta[t].view(-1, 1, 1)
else:
# Each row of data is separately processed.
sqrt_alpha_t = self.sqrt_alpha[t].view(-1, 1)
beta_t = self.beta[t].view(-1, 1)
sqrt_one_minus_alpha_hat_t = self.sqrt_one_minus_alpha_hat[t].view(-1, 1)
epsilon_t = self.std_beta[t].view(-1, 1)
random_noise = torch.randn_like(noise) if i > 1 else torch.zeros_like(noise)
noise = (
(1 / sqrt_alpha_t)
* (
noise
- ((beta_t / sqrt_one_minus_alpha_hat_t) * self._eps_model(noise, t))
)
) + (epsilon_t * random_noise)
noise = mask_x_batch * x_batch + (1.0 - mask_x_batch) * noise
# Generate data output, this activation function depends on
# normalizer_x
x_out = noise.detach().cpu().numpy()
outputs.append(x_out)
outputs = np.concatenate(outputs)
return np.array(outputs)
def _eval(
self,
x: np.ndarray,
x_mask_obs: np.ndarray,
x_df: pd.DataFrame,
x_mask_obs_df: pd.DataFrame,
x_indices: List,
) -> Dict:
"""Evaluate the model.
Parameters
----------
x : np.ndarray
Input data - Array (after pre-processing)
x_mask_obs : np.ndarray
Observed value mask (after pre-processing)
x_df : pd.DataFrame
Reference dataframe before pre-processing
x_mask_obs_df : pd.DataFrame
Observed value mask before pre-processing
x_indices : List
List of row indices for batches
Returns
-------
Dict
Scores
"""
list_x_imputed = []
for i in tqdm(range(self.num_sampling), disable=True, leave=False):
x_imputed = self._impute(x, x_mask_obs)
list_x_imputed.append(x_imputed)
x_imputed = np.mean(np.array(list_x_imputed), axis=0)
x_out = self._process_reversely_data(x_imputed, x_df, x_indices)
if self.is_clip:
for col, interval in self.interval_x.items():
x_out[col] = np.clip(x_out[col], interval[0], interval[1])
x_final = x_df.copy()
x_final.loc[x_out.index] = x_out.loc[x_out.index]
x_mask_imputed_df = ~x_mask_obs_df
columns_with_True = x_mask_imputed_df.columns[(x_mask_imputed_df).any()]
scores = {}
for metric in self.metrics_valid:
scores[metric.__name__] = metric(
x_df[columns_with_True],
x_final[columns_with_True],
x_mask_imputed_df[columns_with_True],
).mean()
return scores
def _process_data(
self,
x: pd.DataFrame,
mask: pd.DataFrame = None,
is_training: bool = False,
) -> Tuple[np.ndarray, np.ndarray, List]:
"""Pre-process data.
Parameters
----------
x : pd.DataFrame
Input data
mask : pd.DataFrame, optional
Observed value mask, by default None
is_training : bool
Processing data for training step
Returns
-------
Tuple[np.ndarray, np.ndarray]
Data and mask pre-processed
"""
if is_training:
self.normalizer_x.fit(x.values)
x_windows_processed = self.normalizer_x.transform(x.fillna(x.mean()).values)
x_windows_mask_processed = ~x.isna().to_numpy()
if mask is not None:
x_windows_mask_processed = mask.to_numpy()
return x_windows_processed, x_windows_mask_processed, list(x.index)
def _process_reversely_data(
self, x_imputed: np.ndarray, x_input: pd.DataFrame, x_indices: List
):
x_normalized = self.normalizer_x.inverse_transform(x_imputed)
x_normalized = x_normalized[: x_input.shape[0]]
x_out = pd.DataFrame(x_normalized, columns=self.columns, index=x_input.index)
x_final = x_input.copy()
x_final.loc[x_out.index] = x_out.loc[x_out.index]
return x_final
[docs] def fit(
self,
x: pd.DataFrame,
epochs: int = 10,
batch_size: int = 100,
print_valid: bool = False,
x_valid: pd.DataFrame = None,
metrics_valid: Tuple[Callable, ...] = (
metrics.mean_absolute_error,
metrics.dist_wasserstein,
),
round: int = 10,
cols_imputed: Tuple[str, ...] = (),
) -> "TabDDPM":
"""Fit data.
Parameters
----------
x : pd.DataFrame
Input dataframe
epochs : int, optional
Number of epochs, by default 10
batch_size : int, optional
Batch size, by default 100
print_valid : bool, optional
Print model performance for after several epochs, by default False
x_valid : pd.DataFrame, optional
Dataframe for validation, by default None
metrics_valid : Tuple[Callable, ...], optional
Set of validation metrics, by default (metrics.mean_absolute_error,
metrics.dist_wasserstein)
round : int, optional
Number of decimal places to round to, for better displaying model
performance, by default 10
cols_imputed : Tuple[str, ...], optional
Name of columns that need to be imputed, by default ()
Raises
------
ValueError
Batch size is larger than data size
Returns
-------
Self
Return Self
"""
seed_torch = self.random_state.randint(2**31 - 1)
torch.manual_seed(seed_torch)
self.dim_input = len(x.columns)
self.epochs = epochs
self.batch_size = batch_size
self.columns = x.columns.tolist()
self.metrics_valid = metrics_valid
self.print_valid = print_valid
self.cols_imputed = cols_imputed
self.round = round
self.time_durations: List = []
self.cols_idx_not_imputed: List = []
if len(self.cols_imputed) != 0:
self.cols_idx_not_imputed = [
idx for idx, col in enumerate(self.columns) if col not in self.cols_imputed
]
self.interval_x = {col: [x[col].min(), x[col].max()] for col in self.columns}
# x_mask: 1 for observed values, 0 for nan
x_processed, x_mask, _ = self._process_data(x, is_training=True)
if self.batch_size > x_processed.shape[0]:
raise ValueError(
f"Batch size {self.batch_size} larger than size of "
"pre-processed x "
f"size={x_processed.shape[0]}. Please reduce batch_size. "
"In the case of TabDDPMTS, you can also reduce freq_str."
)
if x_valid is not None:
# We reuse the UniformHoleGenerator to generate artificial holes
# (with one mask)
# in validation dataset
x_valid_mask = missing_patterns.UniformHoleGenerator(
n_splits=1, ratio_masked=self.ratio_masked
).split(x_valid)[0]
# x_valid_obs_mask is the mask for observed values
x_valid_obs_mask = ~x_valid_mask
(
x_processed_valid,
x_processed_valid_obs_mask,
x_processed_valid_indices,
) = self._process_data(x_valid, x_valid_obs_mask, is_training=False)
x_tensor = torch.from_numpy(x_processed).float().to(self.device)
x_mask_tensor = torch.from_numpy(x_mask).float().to(self.device)
dataloader = DataLoader(
TensorDataset(x_tensor, x_mask_tensor),
batch_size=batch_size,
drop_last=True,
shuffle=True,
)
self._set_eps_model()
self.summary: Dict[str, List] = {
"epoch_loss": [],
}
for epoch in range(epochs):
loss_epoch = 0.0
time_start = time.time()
self._eps_model.train()
for id_batch, (x_batch, mask_x_batch) in enumerate(dataloader):
mask_obs_rand = (
torch.FloatTensor(mask_x_batch.size()).uniform_() > self.ratio_masked
)
for col in self.cols_idx_not_imputed:
mask_obs_rand[:, col] = 0.0
mask_x_batch = mask_x_batch * mask_obs_rand.to(self.device)
self.optimiser.zero_grad()
t = torch.randint(
low=1,
high=self.num_noise_steps,
size=(x_batch.size(dim=0), 1),
device=self.device,
)
x_batch_t, noise = self._q_sample(x=x_batch, t=t)
predicted_noise = self._eps_model(x=x_batch_t, t=t)
loss = (self.loss_func(predicted_noise, noise) * mask_x_batch).mean()
loss.backward()
self.optimiser.step()
loss_epoch += loss.item()
self.summary["epoch_loss"].append(np.mean(loss_epoch))
if x_valid is not None:
self._eps_model.eval()
dict_loss = self._eval(
x_processed_valid,
x_processed_valid_obs_mask,
x_valid,
x_valid_obs_mask,
x_processed_valid_indices,
)
for name_loss, value_loss in dict_loss.items():
if name_loss not in self.summary:
self.summary[name_loss] = [value_loss]
else:
self.summary[name_loss].append(value_loss)
time_duration = time.time() - time_start
self._print_valid(epoch, time_duration)
return self
[docs] def predict(self, x: pd.DataFrame) -> pd.DataFrame:
"""Predict/impute data.
Parameters
----------
x : pd.DataFrame
Data needs to be imputed
Returns
-------
pd.DataFrame
Imputed data
"""
seed_torch = self.random_state.randint(2**31 - 1)
torch.manual_seed(seed_torch)
self._eps_model.eval()
x_processed, x_mask, x_indices = self._process_data(x, is_training=False)
list_x_imputed = []
for i in tqdm(range(self.num_sampling), leave=False):
x_imputed = self._impute(x_processed, x_mask)
list_x_imputed.append(x_imputed)
x_imputed = np.mean(np.array(list_x_imputed), axis=0)
x_out = self._process_reversely_data(x_imputed, x, x_indices)
if self.is_clip:
for col, interval in self.interval_x.items():
x_out[col] = np.clip(x_out[col], interval[0], interval[1])
x_out = x.fillna(x_out)
return x_out
[docs]class TsDDPM(TabDDPM):
"""Time series DDPM.
Diffusion model for time-series data based on
Denoising Diffusion Probabilistic Models (DDPMs) of
Ho et al., 2020 (https://arxiv.org/abs/2006.11239),
Tashiro et al., 2021 (https://arxiv.org/abs/2107.03502).
This implementation follows the implementations found in
https://github.com/quickgrid/pytorch-diffusion/tree/main,
https://github.com/ermongroup/CSDI/tree/main
"""
[docs] def __init__(
self,
num_noise_steps: int = 50,
beta_start: float = 1e-4,
beta_end: float = 0.02,
lr: float = 0.001,
ratio_masked: float = 0.1,
dim_embedding: int = 128,
dim_feedforward: int = 64,
num_blocks: int = 1,
nheads_feature: int = 5,
nheads_time: int = 8,
num_layers_transformer: int = 1,
p_dropout: float = 0.0,
num_sampling: int = 1,
is_rolling: bool = False,
random_state: RandomSetting = None,
):
"""Init function.
Parameters
----------
num_noise_steps : int, optional
Number of noise steps, by default 50
beta_start : float, optional
Range of beta (noise scale value), by default 1e-4
beta_end : float, optional
Range of beta (noise scale value), by default 0.02
lr : float, optional
Learning rate, by default 0.001
ratio_masked : float, optional
Ratio of artificial nan for training and validation, by default 0.1
dim_embedding : int, optional
Embedding dimension, by default 128
dim_feedforward : int, optional
Feedforward layer dimension in Transformers, by default 64
num_blocks : int, optional
Number of residual blocks, by default 1
nheads_feature : int, optional
Number of heads to encode feature-based context, by default 5
nheads_time : int, optional
Number of heads to encode time-based context, by default 8
num_layers_transformer : int, optional
Number of transformer layer, by default 1
p_dropout : float, optional
Dropout probability, by default 0.0
num_sampling : int, optional
Number of samples generated for each cell, by default 1
is_rolling : bool, optional
Use pandas.DataFrame.rolling for preprocessing data,
by default False
random_state : int, RandomState instance or None, default=None
Controls the randomness.
Pass an int for reproducible output across multiple function calls.
"""
super().__init__(
num_noise_steps,
beta_start,
beta_end,
lr,
ratio_masked,
dim_embedding,
num_blocks,
p_dropout,
num_sampling,
random_state=random_state,
)
self.dim_feedforward = dim_feedforward
self.nheads_feature = nheads_feature
self.nheads_time = nheads_time
self.num_layers_transformer = num_layers_transformer
self.is_rolling = is_rolling
def _q_sample(self, x: torch.Tensor, t: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""Sample q.
Section 3.2, algorithm 1 formula implementation. Forward process,
defined by `q`. Found in section 2. `q` gradually adds gaussian noise
according to variance schedule. Also, can be seen on figure 2.
Parameters
----------
x : torch.Tensor
Data input
t : torch.Tensor
Noise step
Returns
-------
Tuple[torch.Tensor, torch.Tensor]
Noised data at noise step t
"""
sqrt_alpha_hat = self.sqrt_alpha_hat[t].view(-1, 1, 1)
sqrt_one_minus_alpha_hat = self.sqrt_one_minus_alpha_hat[t].view(-1, 1, 1)
epsilon = torch.randn_like(x, device=self.device)
return sqrt_alpha_hat * x + sqrt_one_minus_alpha_hat * epsilon, epsilon
def _set_eps_model(self):
self._eps_model = AutoEncoder(
num_noise_steps=self.num_noise_steps,
dim_input=self.dim_input,
residual_block=ResidualBlockTS(
self.dim_embedding,
self.size_window,
self.dim_embedding,
self.dim_feedforward,
self.nheads_feature,
self.nheads_time,
self.num_layers_transformer,
),
dim_embedding=self.dim_embedding,
num_blocks=self.num_blocks,
p_dropout=self.p_dropout,
).to(self.device)
self.optimiser = torch.optim.Adam(self._eps_model.parameters(), lr=self.lr)
def _process_data(
self,
x: pd.DataFrame,
mask: pd.DataFrame = None,
is_training: bool = False,
) -> Tuple[np.ndarray, np.ndarray, List]:
"""Pre-process data.
Parameters
----------
x : pd.DataFrame
Input data
mask : pd.DataFrame, optional
Observed value mask, by default None
is_training : bool
Processing data for training step
Returns
-------
Tuple[np.ndarray, np.ndarray]
Data and mask pre-processed
"""
if is_training:
self.normalizer_x.fit(x.values)
x_windows: List = []
x_windows_indices: List = []
columns_index = [col for col in x.index.names if col != self.index_datetime]
if is_training:
if self.is_rolling:
if self.print_valid:
logging.info(
"Preprocessing data with sliding window "
"(pandas.DataFrame.rolling) "
"can require more times than usual. "
"Please be patient!"
)
if len(columns_index) == 0:
x_windows = x.rolling(window=self.freq_str)
else:
columns_index_ = columns_index[0] if len(columns_index) == 1 else columns_index
for x_group in tqdm(x.groupby(by=columns_index_), disable=True, leave=False):
x_windows += list(
x_group[1].droplevel(columns_index).rolling(window=self.freq_str)
)
else:
for x_w in x.resample(rule=self.freq_str, level=self.index_datetime):
x_windows.append(x_w[1])
else:
if self.is_rolling:
if len(columns_index) == 0:
indices_nan = x.loc[x.isna().any(axis=1), :].index
x_group_rolling = x.rolling(window=self.freq_str)
for x_rolling in x_group_rolling:
if x_rolling.index[-1] in indices_nan:
x_windows.append(x_rolling)
x_windows_indices.append(x_rolling.index)
else:
columns_index_ = columns_index[0] if len(columns_index) == 1 else columns_index
for x_group in tqdm(x.groupby(by=columns_index_), disable=True, leave=False):
x_group_index = [x_group[0]] if len(columns_index) == 1 else x_group[0]
x_group_value = x_group[1].droplevel(columns_index)
indices_nan = x_group_value.loc[x_group_value.isna().any(axis=1), :].index
x_group_rolling = x_group_value.rolling(window=self.freq_str)
for x_rolling in x_group_rolling:
if x_rolling.index[-1] in indices_nan:
x_windows.append(x_rolling)
x_rolling_ = x_rolling.copy()
for idx, col in enumerate(columns_index):
x_rolling_[col] = x_group_index[idx]
x_rolling_ = x_rolling_.set_index(columns_index, append=True)
x_rolling_ = x_rolling_.reorder_levels(x.index.names)
x_windows_indices.append(x_rolling_.index)
else:
for x_w in x.resample(rule=self.freq_str, level=self.index_datetime):
x_windows.append(x_w[1])
x_windows_indices.append(x_w[1].index)
x_windows_processed = []
x_windows_mask_processed = []
self.size_window = np.max([w.shape[0] for w in x_windows])
for x_w in x_windows:
x_w_fillna = x_w.bfill()
x_w_fillna = x_w_fillna.fillna(x.mean())
x_w_norm = self.normalizer_x.transform(x_w_fillna.values)
x_w_mask = ~x_w.isna().to_numpy()
x_w_shape = x_w.shape
if x_w_shape[0] < self.size_window:
npad = [(0, self.size_window - x_w_shape[0]), (0, 0)]
x_w_norm = np.pad(x_w_norm, pad_width=npad, mode="wrap")
x_w_mask = np.pad(
x_w_mask,
pad_width=npad,
mode="constant",
constant_values=1,
)
x_windows_processed.append(x_w_norm)
x_windows_mask_processed.append(x_w_mask)
if mask is not None:
x_windows_mask_processed = []
for x_window_indices in x_windows_indices:
x_m = mask.loc[x_window_indices]
x_m_mask = x_m.to_numpy()
x_m_shape = x_m.shape
if x_m_shape[0] < self.size_window:
npad = [(0, self.size_window - x_m_shape[0]), (0, 0)]
x_m_mask = np.pad(
x_m_mask,
pad_width=npad,
mode="constant",
constant_values=1,
)
x_windows_mask_processed.append(x_m_mask)
return (
np.array(x_windows_processed),
np.array(x_windows_mask_processed),
x_windows_indices,
)
def _process_reversely_data(
self, x_imputed: np.ndarray, x_input: pd.DataFrame, x_indices: List
):
x_imputed_nan_only = []
x_indices_nan_only = []
for x_imputed_batch, x_indices_batch in zip(x_imputed, x_indices):
imputed_index = x_indices_batch.shape[0] - 1
x_imputed_nan_only.append(x_imputed_batch[imputed_index])
x_indices_nan_only.append(x_indices_batch[imputed_index])
if len(np.shape(x_indices_nan_only)) == 1:
x_out_index = pd.Index(x_indices_nan_only, name=x_input.index.names[0])
else:
x_out_index = pd.MultiIndex.from_tuples(x_indices_nan_only, names=x_input.index.names)
x_normalized = self.normalizer_x.inverse_transform(x_imputed_nan_only)
x_out = pd.DataFrame(
x_normalized,
columns=self.columns,
index=x_out_index,
)
x_final = x_input.copy()
x_final.loc[x_out.index] = x_out.loc[x_out.index]
return x_final
[docs] def fit(
self,
x: pd.DataFrame,
epochs: int = 10,
batch_size: int = 100,
print_valid: bool = False,
x_valid: pd.DataFrame = None,
metrics_valid: Tuple[Callable, ...] = (
metrics.mean_absolute_error,
metrics.dist_wasserstein,
),
round: int = 10,
cols_imputed: Tuple[str, ...] = (),
index_datetime: str = "",
freq_str: str = "1D",
) -> "TsDDPM":
"""Fit data.
Parameters
----------
x : pd.DataFrame
Input dataframe
epochs : int, optional
Number of epochs, by default 10
batch_size : int, optional
Batch size, by default 100
print_valid : bool, optional
Print model performance for after several epochs, by default False
x_valid : pd.DataFrame, optional
Dataframe for validation, by default None
metrics_valid : Tuple[Callable, ...], optional
Set of validation metrics, by default (metrics.mean_absolute_error,
metrics.dist_wasserstein)
round : int, optional
Number of decimal places to round to, by default 10
cols_imputed : Tuple[str, ...], optional
Name of columns that need to be imputed, by default ()
index_datetime : str
Name of datetime-like index
freq_str : str
Frequency string of DateOffset of Pandas
Raises
------
ValueError
Batch size is larger than data size
Returns
-------
Self
Return Self
"""
if index_datetime == "":
raise ValueError(
"Please set the params index_datetime "
"(the name of datetime-like index column). "
f" Suggestions: {x.index.names}"
)
self.index_datetime = index_datetime
self.freq_str = freq_str
super().fit(
x,
epochs,
batch_size,
print_valid,
x_valid,
metrics_valid,
round,
cols_imputed,
)
return self