Source code for qolmat.benchmark.metrics

"""Script for metrics."""

from functools import partial
from typing import Callable, Dict, List

import dcor
import numpy as np
import pandas as pd
import scipy
from numpy.linalg import LinAlgError
from sklearn import metrics as skm

from qolmat.utils import algebra, utils
from qolmat.utils.exceptions import NotEnoughSamples

EPS = np.finfo(float).eps

###########################
# Column-wise metrics     #
###########################


def columnwise_metric(
    df1: pd.DataFrame,
    df2: pd.DataFrame,
    df_mask: pd.DataFrame,
    metric: Callable,
    type_cols: str = "all",
    **kwargs,
) -> pd.Series:
    """Compute column-wise metrics.

    For each column, compute a metric score based on the true dataframe
    and the predicted dataframe

    Parameters
    ----------
    df1 : pd.DataFrame
        True dataframe
    df2 : pd.DataFrame
        Predicted dataframe
    df_mask : pd.DataFrame
        Elements of the dataframes to compute on
    metric : Callable
        metric function
    type_cols : str
        Can be either:
        - `all` to apply the metric to all columns
        - `numerical` to apply the metric to numerical columns only
        - `categorical` to apply the metric to categorical columns only
    **kwargs: dict
        additional arguments

    Returns
    -------
    pd.Series
        Series of scores for all columns

    """
    try:
        pd.testing.assert_index_equal(df1.columns, df2.columns)
    except AssertionError:
        raise ValueError(
            f"Input dataframes do not have the same columns! ({df1.columns} != {df2.columns})"
        )
    if type_cols == "all":
        cols = df1.columns.tolist()
    elif type_cols == "numerical":
        cols = utils._get_numerical_features(df1)
    elif type_cols == "categorical":
        cols = utils._get_categorical_features(df1)
    else:
        raise ValueError(f"Value {type_cols} is not valid for parameter `type_cols`!")
    if cols == []:
        raise ValueError(f"No column found for the type {type_cols}!")
    values = {}
    for col in cols:
        df1_col = df1.loc[df_mask[col], col]
        df2_col = df2.loc[df_mask[col], col]
        if df1_col.isna().any() or df2_col.isna().any():
            raise ValueError(f"Column {col} contains NaN.")
        values[col] = metric(df1_col, df2_col, **kwargs)

    return pd.Series(values)


[docs]def mean_squared_error(df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame) -> pd.Series: """Mean squared error between two dataframes. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series """ return columnwise_metric(df1, df2, df_mask, skm.mean_squared_error, type_cols="numerical")
[docs]def root_mean_squared_error( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame ) -> pd.Series: """Compute the root mean squared error between two dataframes. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series """ return columnwise_metric(df1, df2, df_mask, skm.root_mean_squared_error, type_cols="numerical")
[docs]def mean_absolute_error(df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame) -> pd.Series: """Compute the mean absolute error between two dataframes. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series """ return columnwise_metric(df1, df2, df_mask, skm.mean_absolute_error, type_cols="numerical")
[docs]def mean_absolute_percentage_error( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame ) -> pd.Series: """Compute the mean absolute percentage error between two dataframes. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series """ return columnwise_metric( df1, df2, df_mask, skm.mean_absolute_percentage_error, type_cols="numerical", )
def _weighted_mean_absolute_percentage_error_1D(values1: pd.Series, values2: pd.Series) -> float: """Compute the weighted mean absolute perc. error between 2 series. Based on https://en.wikipedia.org/wiki/Mean_absolute_percentage_error Parameters ---------- values1 : pd.Series True values values2 : pd.Series Predicted values Returns ------- float Weighted mean absolute percentage error """ return (values1 - values2).abs().sum() / values1.abs().sum()
[docs]def weighted_mean_absolute_percentage_error( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame ) -> pd.Series: """Compute the weighted mean absolute percentage error between 2 df. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series """ return columnwise_metric( df1, df2, df_mask, _weighted_mean_absolute_percentage_error_1D, type_cols="numerical", )
[docs]def accuracy(df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame) -> pd.Series: """Compute the matching ratio between the two datasets. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series """ return columnwise_metric( df1, df2, df_mask, accuracy_1D, type_cols="all", )
def accuracy_1D(values1: pd.Series, values2: pd.Series) -> float: """Compute the matching ratio between the set of values. Parameters ---------- values1 : pd.Series True values values2 : pd.Series Predicted values Returns ------- float accuracy """ return (values1 == values2).mean()
[docs]def dist_wasserstein( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, method: str = "columnwise", ) -> pd.Series: """Compute the Wasserstein distances between columns of 2 dataframes. Wasserstein distance can only be computed columnwise. Parameters ---------- df1 : pd.DataFrame True dataframe df2 : pd.DataFrame Predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on method : str, optional columnwise or not Returns ------- pd.Series wasserstein distances """ if method == "columnwise": return columnwise_metric(df1, df2, df_mask, scipy.stats.wasserstein_distance) else: raise AssertionError( f"The parameter of the function wasserstein_distance should " "be one of the following: " f"[`columnwise`], not `{method}`!" )
def kolmogorov_smirnov_test_1D(df1: pd.Series, df2: pd.Series) -> float: """Compute KS test statistic. Compute KS test stat. of the two-sample Kolmogorov-Smirnov test for goodness of fit. See more in https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.ks_2samp.html. Parameters ---------- df1 : pd.Series true series df2 : pd.Series predicted series Returns ------- float KS test statistic """ return scipy.stats.ks_2samp(df1, df2)[0]
[docs]def kolmogorov_smirnov_test( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame ) -> pd.Series: """Compute the Kolmogorov Smirnov Test for numerical features. Lower score means better performance. Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series KS test statistic """ return columnwise_metric(df1, df2, df_mask, kolmogorov_smirnov_test_1D, type_cols="numerical")
def _total_variance_distance_1D(df1: pd.Series, df2: pd.Series) -> float: """Compute Total Variance Distance for a categorical feature. It is based on TVComplement in https://github.com/sdv-dev/SDMetrics Parameters ---------- df1 : pd.Series true series df2 : pd.Series predicted series Returns ------- float Total variance distance """ list_categories = list(set(df1.unique()).union(set(df2.unique()))) freqs1 = df1.value_counts() / len(df1) freqs1 = freqs1.reindex(list_categories, fill_value=0.0) freqs2 = df2.value_counts() / len(df2) freqs2 = freqs2.reindex(list_categories, fill_value=0.0) return (freqs1 - freqs2).abs().sum()
[docs]def total_variance_distance( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame ) -> pd.Series: """Compute the total variance distance for categorical features. It is based on TVComplement in https://github.com/sdv-dev/SDMetrics Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series Total variance distance """ return columnwise_metric( df1, df2, df_mask, _total_variance_distance_1D, type_cols="categorical", )
def _check_same_number_columns(df1: pd.DataFrame, df2: pd.DataFrame): if len(df1.columns) != len(df2.columns): raise Exception("inputs have to have the same number of columns.") def _get_correlation_pearson_matrix(df: pd.DataFrame, use_p_value: bool = True) -> pd.DataFrame: """Get matrix of correlation values for numerical features. Based on Pearson correlation coefficient or p-value for testing non-correlation. Parameters ---------- df : pd.DataFrame dataframe use_p_value : bool, optional use the p-value instead of the correlation coefficient, by default True Returns ------- pd.DataFrame Correlation matrix """ cols = df.columns.tolist() matrix = np.zeros((len(df.columns), len(df.columns))) for idx_1, col_1 in enumerate(cols): for idx_2, col_2 in enumerate(cols): res = scipy.stats.mstats.pearsonr(df[[col_1]].values, df[[col_2]].values) if use_p_value: matrix[idx_1, idx_2] = res[1] else: matrix[idx_1, idx_2] = res[0] return pd.DataFrame(matrix, index=cols, columns=cols)
[docs]def mean_difference_correlation_matrix_numerical_features( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, use_p_value: bool = True, ) -> pd.Series: """Compute the mean absolute of differences. Computed between the correlation matrices of df1 and df2. based on Pearson correlation coefficient or p-value for testing non-correlation. Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on use_p_value : bool, optional use the p-value instead of the correlation coefficient, by default True Returns ------- pd.Series Mean absolute of differences for each feature """ df1 = df1[df_mask].dropna(axis=0) df2 = df2[df_mask].dropna(axis=0) _check_same_number_columns(df1, df2) cols_numerical = utils._get_numerical_features(df1) if cols_numerical == []: raise Exception("No numerical feature found") df_corr1 = _get_correlation_pearson_matrix(df1[cols_numerical], use_p_value=use_p_value) df_corr2 = _get_correlation_pearson_matrix(df2[cols_numerical], use_p_value=use_p_value) diff_corr = (df_corr1 - df_corr2).abs().mean(axis=1) return pd.Series(diff_corr, index=cols_numerical)
def _get_correlation_chi2_matrix(data: pd.DataFrame, use_p_value: bool = True) -> pd.DataFrame: """Get matrix of correlation values for categorical features. Based on Chi-square test of independence of variables (the test statistic or the p-value). Parameters ---------- data : pd.DataFrame dataframe use_p_value : bool, optional use the p-value of the test instead of the test statistic, by default True Returns ------- pd.DataFrame Correlation matrix """ cols = data.columns.tolist() matrix = np.zeros((len(data.columns), len(data.columns))) for idx_1, col_1 in enumerate(cols): for idx_2, col_2 in enumerate(cols): freq = data.pivot_table( index=col_1, columns=col_2, aggfunc="size", fill_value=0 ).to_numpy() res = scipy.stats.chi2_contingency(freq) if use_p_value: matrix[idx_1, idx_2] = res[1] else: matrix[idx_1, idx_2] = res[0] return pd.DataFrame(matrix, index=cols, columns=cols)
[docs]def mean_difference_correlation_matrix_categorical_features( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, use_p_value: bool = True, ) -> pd.Series: """Compute the mean absolute of differences. Computed between the correlation matrix of df1 and df2 based on Chi-square test of independence of variables (the test statistic or the p-value) Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on use_p_value : bool, optional use the p-value of the test instead of the test statistic, by default True Returns ------- pd.Series Mean absolute of differences for each feature """ df1 = df1[df_mask].dropna(axis=0) df2 = df2[df_mask].dropna(axis=0) _check_same_number_columns(df1, df2) cols_categorical = utils._get_categorical_features(df1) if cols_categorical == []: raise Exception("No categorical feature found") df_corr1 = _get_correlation_chi2_matrix(df1[cols_categorical], use_p_value=use_p_value) df_corr2 = _get_correlation_chi2_matrix(df2[cols_categorical], use_p_value=use_p_value) diff_corr = (df_corr1 - df_corr2).abs().mean(axis=1) return pd.Series(diff_corr, index=cols_categorical)
def _get_correlation_f_oneway_matrix( df: pd.DataFrame, cols_categorical: List[str], cols_numerical: List[str], use_p_value: bool = True, ) -> pd.DataFrame: """Get matrix of correlation values. Computed between categorical and numerical features based on the one-way ANOVA. Parameters ---------- df : pd.DataFrame dataframe cols_categorical : List[str] list categorical columns cols_numerical : List[str] list numerical columns use_p_value : bool, optional use the p-value of the test instead of the test statistic, by default True Returns ------- pd.DataFrame Correlation matrix """ matrix = np.zeros((len(cols_categorical), len(cols_numerical))) for idx_cat, col_cat in enumerate(cols_categorical): for idx_num, col_num in enumerate(cols_numerical): category_group_lists = df.groupby(col_cat)[col_num].apply(list) res = scipy.stats.f_oneway(*category_group_lists) if use_p_value: matrix[idx_cat, idx_num] = res[1] else: matrix[idx_cat, idx_num] = res[0] return pd.DataFrame(matrix, index=cols_categorical, columns=cols_numerical)
[docs]def mean_diff_corr_matrix_categorical_vs_numerical_features( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, use_p_value: bool = True, ) -> pd.Series: """Compute the mean absolute of differences. Computation between the correlation matrix of df1 and df2 based on the one-way ANOVA. Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on use_p_value : bool, optional use the p-value of the test instead of the test statistic, by default True Returns ------- pd.Series Mean absolute of differences for each feature """ df1 = df1[df_mask].dropna(axis=0) df2 = df2[df_mask].dropna(axis=0) _check_same_number_columns(df1, df2) cols_categorical = utils._get_categorical_features(df1) if cols_categorical == []: raise Exception("No categorical feature found") cols_numerical = utils._get_numerical_features(df1) if cols_numerical == []: raise Exception("No numerical feature found") df_corr1 = _get_correlation_f_oneway_matrix( df1, cols_categorical, cols_numerical, use_p_value=use_p_value ) df_corr2 = _get_correlation_f_oneway_matrix( df2, cols_categorical, cols_numerical, use_p_value=use_p_value ) diff_corr = (df_corr1 - df_corr2).abs().mean(axis=1) return pd.Series(diff_corr, index=cols_categorical)
########################### # Row-wise metrics # ########################### def _sum_manhattan_distances_1D(values: pd.Series) -> float: """Compute the sum of Manhattan distances computed for one column. It is based on https://www.geeksforgeeks.org/sum-manhattan-distances-pairs-points/ Parameters ---------- values : pd.Series Values of a column Returns ------- float Sum of Manhattan distances """ values = values.sort_values(ascending=True) sums_partial = values.shift().fillna(0.0).cumsum() differences_partial = values * np.arange(len(values)) - sums_partial res = differences_partial.sum() return res def _sum_manhattan_distances(df1: pd.DataFrame) -> float: """Compute the sum Manhattan distances between all pairs of rows. It is based on https://www.geeksforgeeks.org/sum-manhattan-distances-pairs-points/ Parameters ---------- df1 : pd.DataFrame input dataframe Returns ------- float Sum of Manhattan distances for all pairs of rows. """ cols = df1.columns.tolist() result = sum([_sum_manhattan_distances_1D(df1[col]) for col in cols]) return result
[docs]def sum_energy_distances(df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame) -> pd.Series: """Compute the sum of energy distances between df1 and df2. It is based on https://dcor.readthedocs.io/en/latest/theory.html# Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series Sum of energy distances between df1 and df2. """ # Replace nan in dataframe df1 = df1[df_mask].fillna(0.0) df2 = df2[df_mask].fillna(0.0) # sum of (len_df1 * (len_df1 - 1) / 2) distances for df1 sum_distances_df1 = _sum_manhattan_distances(df1) sum_distances_df2 = _sum_manhattan_distances(df2) df = pd.concat([df1, df2]) sum_distances_df1_df2 = _sum_manhattan_distances(df) sum_distance = 2 * sum_distances_df1_df2 - 4 * sum_distances_df1 - 4 * sum_distances_df2 return pd.Series(sum_distance, index=["All"])
def sum_pairwise_distances( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, metric: str = "cityblock", ) -> float: """Compute the sum of pairwise distances based on a predefined metric. Metrics are found in this link https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.cdist.html Parameters ---------- df1 : pd.DataFrame First empirical distribution without nans df2 : pd.DataFrame Second empirical distribution without nans df_mask : pd.DataFrame Elements of the dataframes to compute on metric : str, optional distance metric, by default 'cityblock' Returns ------- float Sum of pairwise distances based on a predefined metric """ df1 = df1[df_mask.any(axis=1)] df2 = df2[df_mask.any(axis=1)] distances = np.sum(scipy.spatial.distance.cdist(df1, df2, metric=metric)) return distances ########################### # Dataframe-wise metrics # ########################### def frechet_distance_base( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, ) -> pd.Series: """Compute the Fréchet distance between two dataframes df1 and df2. Frechet_distance = || mu_1 - mu_2 ||_2^2 + Tr(Sigma_1 + Sigma_2 - 2(Sigma_1 . Sigma_2)^(1/2)) It is normalized, df1 and df2 are first scaled by a factor (std(df1) + std(df2)) / 2, and then centered around (mean(df1) + mean(df2)) / 2 Based on: Dowson, D. C., and BV666017 Landau. "The Fréchet distance between multivariate normal distributions." Journal of multivariate analysis 12.3 (1982): 450-455. Parameters ---------- df1 : pd.DataFrame true dataframe df2 : pd.DataFrame predicted dataframe df_mask : pd.DataFrame Elements of the dataframes to compute on Returns ------- pd.Series Frechet distance in a Series object """ if df1.shape != df2.shape or df1.shape != df_mask.shape: raise Exception("inputs have to be of same dimensions.") df1 = df1.copy() df2 = df2.copy() # Set to nan the values not in the mask df1[~df_mask] = np.nan df2[~df_mask] = np.nan std = (np.std(df1) + np.std(df2) + EPS) / 2 mu = (np.nanmean(df1, axis=0) + np.nanmean(df2, axis=0)) / 2 df1 = (df1 - mu) / std df2 = (df2 - mu) / std means1, cov1 = utils.nan_mean_cov(df1.values) means2, cov2 = utils.nan_mean_cov(df2.values) distance = algebra.frechet_distance_exact(means1, cov1, means2, cov2) return pd.Series(distance, index=["All"])
[docs]def frechet_distance( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, method: str = "single", min_n_rows: int = 10, ) -> pd.Series: """Compute Frechet distance computed using a pattern decomposition. Several variant are implemented: i) the `single` method relies on a single estimation of the means and covariance matrix. It is relevent for MCAR data. ii) the `pattern` method relies on the aggregation of the estimated distance between each pattern. It is relevent for MAR data. Parameters ---------- df1 : pd.DataFrame First empirical distribution df2 : pd.DataFrame Second empirical distribution df_mask : pd.DataFrame Mask indicating on which values the distance has to computed on method: str Method used to compute the distance on multivariate datasets with missing values. Possible values are `robust` and `pattern`. min_n_rows: int Minimum number of rows for a KL estimation Returns ------- pd.Series Series of computed metrics """ if method == "single": return frechet_distance_base(df1, df2, df_mask) return pattern_based_weighted_mean_metric( df1, df2, df_mask, frechet_distance_base, min_n_rows=min_n_rows, type_cols="numerical", )
def kl_divergence_1D(df1: pd.Series, df2: pd.Series) -> float: """Estimate the Kullback-Leibler divergence for 1D. Computation between the two 1D empirical distributions given by `df1`and `df2`. The samples are binarized using a uniform spacing with 20 bins from the smallest to the largest value. Not that this may be a coarse estimation. Parameters ---------- df1 : pd.Series First empirical distribution df2 : pd.Series Second empirical distribution Returns ------- float Kullback-Leibler divergence between the two empirical distributions. """ min_val = min(df1.min(), df2.min()) max_val = max(df1.max(), df2.max()) bins = np.linspace(min_val, max_val, 20) p = np.histogram(df1, bins=bins, density=True)[0] q = np.histogram(df2, bins=bins, density=True)[0] return scipy.stats.entropy(p + EPS, q + EPS) def kl_divergence_gaussian(df1: pd.DataFrame, df2: pd.DataFrame) -> float: """Compute Kullback-Leibler divergence estimation. Computation based on a Gaussian approximation of both empirical distributions Parameters ---------- df1 : pd.DataFrame First empirical distribution df2 : pd.DataFrame Second empirical distribution Returns ------- pd.Series Series of estimated metrics """ cov1 = df1.cov().values cov2 = df2.cov().values means1 = np.array(df1.mean()) means2 = np.array(df2.mean()) try: div_kl = algebra.kl_divergence_gaussian_exact(means1, cov1, means2, cov2) except LinAlgError: raise ValueError( "Provided datasets have degenerate colinearities, KL-divergence cannot be computed!" ) return div_kl
[docs]def kl_divergence( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, method: str = "columnwise", min_n_rows: int = 10, ) -> pd.Series: """Estimate the KL divergence. Estimation of the Kullback-Leibler divergence between too empirical distributions. Three methods are implemented: - columnwise, relying on a uniform binarization and only taking marginals into account (https://en.wikipedia.org/wiki/Kullback%E2%80%93Leibler_divergence), - gaussian, relying on a Gaussian approximation, Parameters ---------- df1 : pd.DataFrame First empirical distribution df2 : pd.DataFrame Second empirical distribution df_mask: pd.DataFrame Mask indicating on what values the divergence should be computed method: str Method used to compute the divergence on multivariate datasets with missing values. Possible values are `columnwise` and `gaussian`. min_n_rows: int Minimum number of rows for a KL estimation Returns ------- pd.Series Kullback-Leibler divergence Raises ------ AssertionError If the empirical distributions do not have enough samples to estimate a KL divergence. Consider using a larger dataset of lowering the parameter `min_n_rows`. """ if method == "columnwise": return columnwise_metric(df1, df2, df_mask, kl_divergence_1D, type_cols="numerical") elif method == "gaussian": return pattern_based_weighted_mean_metric( df1, df2, df_mask, kl_divergence_gaussian, min_n_rows=min_n_rows, type_cols="numerical", ) else: raise AssertionError( f"The parameter of the function wasserstein_distance " "should be one of the following: " f"[`columnwise`, `gaussian`], not `{method}`!" )
def distance_anticorr(df1: pd.DataFrame, df2: pd.DataFrame) -> float: """Compute distance anticorr. Score based on the distance anticorrelation between two empirical distributions. The theoretical basis can be found on dcor documentation: https://dcor.readthedocs.io/en/latest/theory.html Parameters ---------- df1 : pd.DataFrame Dataframe representing the first empirical distribution df2 : pd.DataFrame Dataframe representing the second empirical distribution Returns ------- float Distance correlation score """ return (1 - dcor.distance_correlation(df1.values, df2.values)) / 2 def distance_anticorr_pattern( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, min_n_rows: int = 10, ) -> pd.Series: """Compute correlation distance computed using a pattern decomposition. Parameters ---------- df1 : pd.DataFrame First empirical distribution df2 : pd.DataFrame Second empirical distribution df_mask : pd.DataFrame Mask indicating on which values the distance has to computed on min_n_rows: int Minimum number of rows for a KL estimation Returns ------- pd.Series Series of computed metrics """ return pattern_based_weighted_mean_metric( df1, df2, df_mask, distance_anticorr, min_n_rows=min_n_rows, type_cols="numerical", )
[docs]def pattern_based_weighted_mean_metric( df1: pd.DataFrame, df2: pd.DataFrame, df_mask: pd.DataFrame, metric: Callable, min_n_rows: int = 10, type_cols: str = "all", **kwargs, ) -> pd.Series: """Compute a mean score based on missing patterns. Note that for each pattern, a score is returned by the function metric. This code is based on https://www.statsmodels.org/ Parameters ---------- df1 : pd.DataFrame Dataframe representing the first empirical distribution, with nans df2 : pd.DataFrame Dataframe representing the second empirical distribution df_mask : pd.DataFrame Elements of the dataframes to compute on metric : Callable metric function min_n_rows : int, optional minimum number of row allowed for a pattern without nan, by default 10 type_cols : str, optional type of the columns ("all", "numerical", "categorical") **kwargs : dict additional arguments Returns ------- pd.Series _description_ """ if type_cols == "all": cols = df1.columns elif type_cols == "numerical": cols = df1.select_dtypes(include=["number"]).columns elif type_cols == "categorical": cols = df1.select_dtypes(exclude=["number"]).columns else: raise ValueError(f"Value {type_cols} is not valid for parameter `type_cols`!") if np.any(df_mask & df1.isna()): raise ValueError("The argument df1 has missing values on the mask!") if np.any(df_mask & df2.isna()): raise ValueError("The argument df2 has missing values on the mask!") rows_mask = df_mask.any(axis=1) scores = [] weights = [] df1 = df1[cols].loc[rows_mask] df2 = df2[cols].loc[rows_mask] df_mask = df_mask[cols].loc[rows_mask] max_num_row = 0 for tup_pattern, df_mask_pattern in df_mask.groupby(df_mask.columns.tolist()): ind_pattern = df_mask_pattern.index df1_pattern = df1.loc[ind_pattern, list(tup_pattern)] max_num_row = max(max_num_row, len(df1_pattern)) if not any(tup_pattern) or len(df1_pattern) < min_n_rows: continue df2_pattern = df2.loc[ind_pattern, list(tup_pattern)] weights.append(len(df1_pattern) / len(df1)) scores.append(metric(df1_pattern, df2_pattern, **kwargs)) if len(scores) == 0: raise NotEnoughSamples(max_num_row, min_n_rows) return pd.Series(sum([s * w for s, w in zip(scores, weights)]), index=["All"])
def get_metric( name: str, ) -> Callable[[pd.DataFrame, pd.DataFrame, pd.DataFrame], pd.Series]: """Get metric. Parameters ---------- name : str name of the metic to compute Returns ------- Callable[[pd.DataFrame, pd.DataFrame, pd.DataFrame], pd.Series] metric """ dict_metrics: Dict[str, Callable] = { "mse": mean_squared_error, "rmse": root_mean_squared_error, "mae": mean_absolute_error, "wmape": weighted_mean_absolute_percentage_error, "accuracy": accuracy, "wasserstein_columnwise": dist_wasserstein, "kl_columnwise": partial(kl_divergence, method="columnwise"), "kl_gaussian": partial(kl_divergence, method="gaussian"), "ks_test": kolmogorov_smirnov_test, "correlation_diff": (mean_difference_correlation_matrix_numerical_features), "energy": sum_energy_distances, "frechet": partial(frechet_distance, method="single"), "frechet_pattern": partial(frechet_distance, method="pattern"), "dist_corr_pattern": distance_anticorr_pattern, } return dict_metrics[name]