Source code for qolmat.benchmark.comparator

"""Script for comparator."""

import logging
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from joblib import Parallel, cpu_count, delayed
from sklearn import utils as sku

from qolmat.benchmark import hyperparameters, metrics
from qolmat.benchmark.missing_patterns import _HoleGenerator

logging.basicConfig(
    format="%(asctime)s %(levelname)-8s %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S",
)


[docs]class Comparator: """Comparator class. This class implements a comparator for evaluating different imputation methods. Parameters ---------- dict_models: Dict[str, any] dictionary of imputation methods columnwise_evaluation : Optional[bool], optional whether the metric should be calculated column-wise or not, by default False dict_config_opti: Optional[Dict[str, Dict[str, Union[str, float, int]]]] dictionary of search space for each implementation method. By default, the value is set to {}. max_evals: int = 10 number of calls of the optimization algorithm 10. """
[docs] def __init__( self, dict_models: Dict[str, Any], generator_holes: _HoleGenerator, metrics: List = ["mae", "wmape", "kl_columnwise"], dict_config_opti: Optional[Dict[str, Any]] = {}, metric_optim: str = "mse", max_evals: int = 10, verbose: bool = False, ): self.dict_imputers = dict_models self.generator_holes = generator_holes self.metrics = metrics self.dict_config_opti = dict_config_opti self.metric_optim = metric_optim self.max_evals = max_evals self.verbose = verbose
[docs] def get_errors( self, df_origin: pd.DataFrame, df_imputed: pd.DataFrame, df_mask: pd.DataFrame, ) -> pd.DataFrame: """Get errors - estimate the reconstruction's quality. Parameters ---------- df_origin : pd.DataFrame reference/original signal df_imputed : pd.DataFrame imputed signal df_mask : pd.DataFrame masked dataframe (NA) Returns ------- pd.DataFrame DataFrame of results obtained via different metrics """ dict_errors = {} for name_metric in self.metrics: fun_metric = metrics.get_metric(name_metric) dict_errors[name_metric] = fun_metric(df_origin, df_imputed, df_mask) df_errors = pd.concat(dict_errors.values(), keys=dict_errors.keys()) return df_errors
[docs] def process_split(self, split_data: Tuple[int, pd.DataFrame, pd.DataFrame]) -> pd.DataFrame: """Process a split. Parameters ---------- split_data : Tuple contains (split_idx, df_mask, df_origin) Returns ------- pd.DataFrame errors results """ self.generator_holes.random_state = sku.check_random_state( self.generator_holes.random_state ) self.generator_holes.save_rng_state() for name, imputer in self.dict_imputers.items(): self.generator_holes.load_rng_state() _, df_mask, df_origin = split_data df_with_holes = df_origin.copy() df_with_holes[df_mask] = np.nan subset = self.generator_holes.subset if subset is None: raise ValueError( "HoleGenerator `subset` should be overwritten in split but it is none!" ) split_results = {} for imputer_name, imputer in self.dict_imputers.items(): dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {}) imputer_opti = hyperparameters.optimize( imputer, df_origin, self.generator_holes, self.metric_optim, dict_config_opti_imputer, max_evals=self.max_evals, verbose=self.verbose, ) df_imputed = imputer_opti.fit_transform(df_with_holes) errors = self.get_errors(df_origin[subset], df_imputed[subset], df_mask[subset]) split_results[imputer_name] = errors return pd.concat(split_results, axis=1)
[docs] def process_imputer( self, imputer_data: Tuple[str, Any, List[pd.DataFrame], pd.DataFrame] ) -> Tuple[str, pd.DataFrame]: """Process an imputer. Parameters ---------- imputer_data : Tuple[str, Any, List[pd.DataFrame], pd.DataFrame] contains (imputer_name, imputer, all_masks, df_origin) Returns ------- Tuple[str, pd.DataFrame] imputer name, errors results """ imputer_name, imputer, all_masks, df_origin = imputer_data subset = self.generator_holes.subset if subset is None: raise ValueError( "HoleGenerator `subset` should be overwritten in split but it is none!" ) dict_config_opti_imputer = self.dict_config_opti.get(imputer_name, {}) imputer_opti = hyperparameters.optimize( imputer, df_origin, self.generator_holes, self.metric_optim, dict_config_opti_imputer, max_evals=self.max_evals, verbose=self.verbose, ) imputer_results = [] for i, df_mask in enumerate(all_masks): df_with_holes = df_origin.copy() df_with_holes[df_mask] = np.nan df_imputed = imputer_opti.fit_transform(df_with_holes) errors = self.get_errors(df_origin[subset], df_imputed[subset], df_mask[subset]) imputer_results.append(errors) return imputer_name, pd.concat(imputer_results).groupby(level=[0, 1]).mean()
[docs] def compare( self, df_origin: pd.DataFrame, use_parallel: bool = True, n_jobs: int = -1, parallel_over: str = "auto", ) -> pd.DataFrame: """Compare different imputers in parallel with hyperparams opti. Parameters ---------- df_origin : pd.DataFrame df with missing values n_splits : int, optional number of 'splits', i.e. fake dataframe with artificial holes, by default 10 use_parallel : bool, optional if parallelisation, by default True n_jobs : int, optional number of jobs to use for the parallelisation, by default -1 parallel_over : str, optional 'splits' or 'imputers', by default "auto" Returns ------- pd.DataFrame DataFrame (2-level index) with results. Columns are imputers. 0-level index are the metrics. 1-level index are the column names. """ logging.info(f"Starting comparison for {len(self.dict_imputers)} imputers.") all_splits = list(self.generator_holes.split(df_origin)) if parallel_over == "auto": parallel_over = "splits" if len(all_splits) > len(self.dict_imputers) else "imputers" if use_parallel: logging.info(f"Parallelisation over: {parallel_over}...") if parallel_over == "splits": split_data = [(i, df_mask, df_origin) for i, df_mask in enumerate(all_splits)] n_jobs = self.get_optimal_n_jobs(split_data, n_jobs) results = Parallel(n_jobs=n_jobs)( delayed(self.process_split)(data) for data in split_data ) final_results = pd.concat(results).groupby(level=[0, 1]).mean() elif parallel_over == "imputers": imputer_data = [ (name, imputer, all_splits, df_origin) for name, imputer in self.dict_imputers.items() ] n_jobs = self.get_optimal_n_jobs(imputer_data, n_jobs) results = Parallel(n_jobs=n_jobs)( delayed(self.process_imputer)(data) for data in imputer_data ) final_results = pd.concat(dict(results), axis=1) else: raise ValueError("`parallel_over` should be `auto`, `splits` or `imputers`.") else: logging.info("Sequential treatment...") if parallel_over == "splits": split_data = [(i, df_mask, df_origin) for i, df_mask in enumerate(all_splits)] results = [self.process_split(data) for data in split_data] final_results = pd.concat(results).groupby(level=[0, 1]).mean() elif parallel_over == "imputers": imputer_data = [ (name, imputer, all_splits, df_origin) for name, imputer in self.dict_imputers.items() ] results = [self.process_imputer(data) for data in imputer_data] final_results = pd.concat(dict(results), axis=1) else: raise ValueError("`parallel_over` should be `auto`, `splits` or `imputers`.") logging.info("Comparison successfully terminated.") return final_results
[docs] @staticmethod def get_optimal_n_jobs(split_data: List, n_jobs: int = -1) -> int: """Determine the optimal number of parallel jobs to use. If `n_jobs` is specified by the user, that value is used. Otherwise, the function returns the minimum between the number of CPU cores and the number of tasks (i.e., the length of `split_data`), ensuring that no more jobs than tasks are launched. Parameters ---------- split_data : List A collection of data to be processed in parallel. The length of this collection determines the number of tasks. n_jobs : int The number of jobs (parallel workers) to use, by default -1 Returns ------- int The optimal number of jobs to run in parallel """ return min(cpu_count(), len(split_data)) if n_jobs == -1 else n_jobs