Source code for qolmat.utils.data

"""Utils data for qolmat package."""

import os
import sys
import zipfile
from datetime import datetime
from math import pi
from typing import Dict, List, Tuple, Union
from urllib import request

import numpy as np
import pandas as pd

from qolmat.benchmark import missing_patterns
from qolmat.utils.utils import RandomSetting

CURRENT_DIR = os.path.abspath(os.path.dirname(__file__))
ROOT_DIR = os.path.join(CURRENT_DIR, "..")


def read_csv_local(data_file_name: str, **kwargs) -> pd.DataFrame:
    """Load csv files.

    Parameters
    ----------
    data_file_name : str
        Filename. Has to be "beijing" or "conductors".
    **kwargs : dict, optional
        Additional keyword arguments passed to `pandas.read_csv`.

    Returns
    -------
    df : pd.DataFrame
        dataframe

    """
    df = pd.read_csv(os.path.join(ROOT_DIR, "data", f"{data_file_name}.csv"), **kwargs)
    return df


def download_data_from_zip(
    zipname: str, urllink: str, datapath: str = "data/"
) -> List[pd.DataFrame]:
    """Download and extracts ZIP files from a URL.

    It also loads DataFrames from CSV files.

    Parameters
    ----------
    zipname : str
        Name of the ZIP file to download, without the '.zip' extension.
    urllink : str
        Base URL where the ZIP file is hosted.
    datapath : str, optional
        Path to the directory where the ZIP will be downloaded and extracted.
        Defaults to 'data/'.

    Returns
    -------
    List[pd.DataFrame]
        A list of DataFrames loaded from the CSV files
        within the extracted directory.

    """
    path_zip = os.path.join(datapath, zipname)
    path_zip_ext = path_zip + ".zip"
    url = os.path.join(urllink, zipname) + ".zip"
    os.makedirs(datapath, exist_ok=True)
    if not os.path.exists(path_zip_ext) and not os.path.exists(path_zip):
        request.urlretrieve(url, path_zip_ext)
    if not os.path.exists(path_zip):
        with zipfile.ZipFile(path_zip_ext, "r") as zip_ref:
            zip_ref.extractall(path_zip)
    list_df = get_dataframes_in_folder(path_zip, ".csv")
    return list_df


def get_dataframes_in_folder(path: str, extension: str) -> List[pd.DataFrame]:
    """Load all dataframes from files.

    Loads all files with a specified extension within a directory, including
    subdirectories. Special handling for '.tsf' files which are converted
    and immediately returned.

    Parameters
    ----------
    path : str
        Path to the directory to search for files.
    extension : str
        File extension to filter files by, e.g., '.csv'.

    Returns
    -------
    List[pd.DataFrame]
        A list of pandas DataFrames loaded from the files
        matching the extension. If a '.tsf' file is found,
        its converted DataFrame is returned immediately.

    """
    list_df = []
    for folder, _, files in os.walk(path):
        for file in files:
            if extension in file:
                list_df.append(pd.read_csv(os.path.join(folder, file)))
            if ".tsf" in file:
                loaded_data = convert_tsf_to_dataframe(os.path.join(folder, file))
                return [loaded_data]
    return list_df


def generate_artificial_ts(
    n_samples: int,
    periods: List[int],
    amp_anomalies: float,
    ratio_anomalies: float,
    amp_noise: float,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Generate TS data, anomalies, and noise based on given parameters.

    Parameters
    ----------
    n_samples : int
        Number of samples in the time series.
    periods : List[int]
        List of periods that are added to the time series.
    amp_anomalies : float
        Amplitude multiplier for anomalies.
    ratio_anomalies : float
        Ratio of total samples that will be anomalies.
    amp_noise : float
        Standard deviation of Gaussian noise.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray]
        Time series data with sine waves (X).
        Anomaly data with specified amplitudes at random positions (A).
        Gaussian noise added to the time series (E).

    """
    mesh = np.arange(n_samples)
    X = np.ones(n_samples)
    for p in periods:
        X += np.sin(2 * pi * mesh / p)

    n_anomalies = int(n_samples * ratio_anomalies)
    anomalies = np.random.standard_exponential(size=n_anomalies)
    anomalies *= amp_anomalies * np.random.choice([-1, 1], size=n_anomalies)
    ind_anomalies = np.random.choice(range(n_samples), size=n_anomalies, replace=False)
    A = np.zeros(n_samples)
    A[ind_anomalies] = anomalies

    E = amp_noise * np.random.normal(size=n_samples)
    return X, A, E


def get_data(
    name_data: str = "Beijing",
    datapath: str = "data/",
    n_groups_max: int = sys.maxsize,
) -> pd.DataFrame:
    """Download or generate data.

    Parameters
    ----------
    name_data: str, optional
        name of the file, by default "Beijing"
    datapath : str, optional
        data path, by default "data/"
    n_groups_max : int, optional
        max number of groups, by default sys.maxsize.
        Only used if name_data == "SNCF"

    Returns
    -------
    pd.DataFrame
        requested data

    """
    url_zenodo = "https://zenodo.org/record/"
    if name_data == "Beijing":
        df = read_csv_local("beijing")
        df["date"] = pd.to_datetime(df["date"])
        df = df.drop(columns=["year", "month", "day", "hour", "wd"])
        df = df.groupby(["station", "date"]).mean()
        return df
    elif name_data == "Superconductor":
        df = read_csv_local("conductors")
        return df
    elif name_data == "Titanic":
        path = "https://gist.githubusercontent.com/fyyying/4aa5b471860321d7b47fd881898162b7/raw/"
        "6907bb3a38bfbb6fccf3a8b1edfb90e39714d14f/titanic_dataset.csv"
        df = pd.read_csv(path)
        df = df[["Survived", "Sex", "Age", "SibSp", "Parch", "Fare", "Embarked"]].copy()
        df["Age"] = pd.to_numeric(df["Age"], errors="coerce")
        df.loc["Fare"] = pd.to_numeric(df["Fare"], errors="coerce")
        return df
    elif name_data == "Artificial":
        city = "Wonderland"
        n_samples = 1000
        periods = [100, 20]
        amp_anomalies = 0.5
        ratio_anomalies = 0.05
        amp_noise = 0.1

        X, A, E = generate_artificial_ts(
            n_samples, periods, amp_anomalies, ratio_anomalies, amp_noise
        )
        signal = X + A + E
        df = pd.DataFrame({"signal": signal, "index": range(n_samples), "station": city})
        df.set_index(["station", "index"], inplace=True)

        df["X"] = X
        df["A"] = A
        df["E"] = E
        return df
    elif name_data == "SNCF":
        path_file = os.path.join(datapath, "validations_idfm_std.parq")
        df = pd.read_parquet(path_file)
        sizes_stations = df.groupby("station")["val_in"].mean().sort_values()
        n_groups_max = min(len(sizes_stations), n_groups_max)
        stations = sizes_stations.index.get_level_values("station").unique()[-n_groups_max:]
        df = df.loc[stations]
        return df
    elif name_data == "Beijing_online":
        # urllink = "https://archive.ics.uci.edu/static/public/381/"
        # zipname = "beijing+pm2+5+data"
        urllink = "https://archive.ics.uci.edu/static/public/501/"
        zipname = "beijing+multi+site+air+quality+data"

        list_df = download_data_from_zip(zipname, urllink, datapath=datapath)
        list_df = [preprocess_data_beijing(df) for df in list_df]
        df = pd.concat(list_df)
        return df
    elif name_data == "Superconductor_online":
        csv_url = (
            "https://huggingface.co/datasets/polinaeterna/"
            "tabular-benchmark/resolve/main/reg_num/superconduct.csv"
        )
        df = pd.read_csv(csv_url, index_col=0)
        return df
    elif name_data == "conductor":
        df = read_csv_local("conductors")
        return df
    elif name_data == "Monach_weather":
        urllink = os.path.join(url_zenodo, "4654822/files/weather_dataset.zip?download=1")
        zipname = "weather_dataset"
        list_loaded_data = download_data_from_zip(zipname, urllink, datapath=datapath)
        loaded_data = list_loaded_data[0]
        df_list: List[pd.DataFrame] = []
        for k in range(len(loaded_data)):
            values = list(loaded_data["series_value"][k])
            freq = "1D"
            time_index = pd.date_range(
                start=pd.Timestamp("01/01/2010"),
                periods=len(values),
                freq=freq,
            )
            df_list = df_list + [
                pd.DataFrame(
                    {loaded_data.series_name[k] + " " + loaded_data.series_type[k]: values},
                    index=time_index,
                )
            ]
        minimum = min([len(df) for df in df_list])
        df = pd.concat(df_list, axis=1)
        df = df[:minimum]
        return df
    elif name_data == "Monach_electricity_australia":
        urllink = os.path.join(
            url_zenodo,
            "4659727/files/australian_electricity_demand_dataset.zip?download=1",
        )
        zipname = "australian_electricity_demand_dataset"
        list_loaded_data = download_data_from_zip(zipname, urllink, datapath=datapath)
        loaded_data = list_loaded_data[0]
        df_list = []
        for k in range(len(loaded_data)):
            values = list(loaded_data["series_value"][k])
            freq = "30min"
            time_index = pd.date_range(
                start=loaded_data.start_timestamp[k],
                periods=len(values),
                freq=freq,
            )
            df_list = df_list + [
                pd.DataFrame(
                    {loaded_data.series_name[k] + " " + loaded_data.state[k]: values},
                    index=time_index,
                )
            ]
        minimum = min([len(df) for df in df_list])
        df = pd.concat(df_list, axis=1)
        df = df[:minimum]
        return df
    else:
        raise ValueError(f"Data name {name_data} is unknown!")


def preprocess_data_beijing(df: pd.DataFrame) -> pd.DataFrame:
    """Preprocess data from the "Beijing" dataset.

    Parameters
    ----------
    df : pd.DataFrame
        dataframe with some specific column names

    Returns
    -------
    pd.DataFrame
        preprocessed dataframe

    """
    df["datetime"] = pd.to_datetime(df[["year", "month", "day", "hour"]])
    df["station"] = "Beijing"
    df.set_index(["station", "datetime"], inplace=True)
    df.drop(
        columns=[
            "year",
            "month",
            "day",
            "hour",
            "No",
            "cbwd",
            "Iws",
            "Is",
            "Ir",
        ],
        inplace=True,
    )
    df.sort_index(inplace=True)
    df = df.groupby(
        ["station", df.index.get_level_values("datetime").floor("d")],
        group_keys=False,
    ).mean()
    return df


[docs]def add_holes( df: pd.DataFrame, ratio_masked: float, mean_size: int, random_state: RandomSetting = None, ) -> pd.DataFrame: """Create holes in a dataset with no missing value, starting from `df`. Only used in the documentation to design examples. Parameters ---------- df : pd.DataFrame dataframe no missing values mean_size : int Targeted mean size of the holes to add ratio_masked : float Targeted global proportion of nans added in the returned dataset random_state: RandomSetting Random state for reproducibility Returns ------- pd.DataFrame dataframe with missing values """ groups = df.index.names.difference(["datetime", "date", "index", None]) if groups != []: generator = missing_patterns.GeometricHoleGenerator( 1, ratio_masked=ratio_masked, subset=df.columns, random_state=random_state, groups=groups, ) else: generator = missing_patterns.GeometricHoleGenerator( 1, ratio_masked=ratio_masked, subset=df.columns, random_state=random_state, ) generator.dict_probas_out = dict.fromkeys(df.columns, 1 / mean_size) generator.dict_ratios = {column: 1 / len(df.columns) for column in df.columns} if generator.groups: mask = df.groupby(groups, group_keys=False).apply(generator.generate_mask) else: mask = generator.generate_mask(df) X_with_nans = df.copy() X_with_nans[mask] = np.nan return X_with_nans
def get_data_corrupted( name_data: str = "Beijing", mean_size: int = 90, ratio_masked: float = 0.2, random_state: RandomSetting = None, ) -> pd.DataFrame: """Corrupt data. Return a dataframe with controlled corruption obtained from the source `name_data`. Parameters ---------- name_data : str Name of the data source, can be "Beijing" or "Artificial" mean_size: int Mean size of the holes to be generated using a geometric law ratio_masked: float Percent of missing data in each column in the output dataframe random_state: RandomSetting Random state for reproducibility Returns ------- pd.DataFrame Dataframe with missing values """ df = get_data(name_data) df = add_holes( df, mean_size=mean_size, ratio_masked=ratio_masked, random_state=random_state, ) return df def add_station_features(df: pd.DataFrame) -> pd.DataFrame: """Create a station feature in the dataset. Parameters ---------- df : pd.DataFrame dataframe no missing values Returns ------- pd.DataFrame dataframe with missing values """ df = df.copy() stations = df.index.get_level_values("station") for station in stations.unique(): df[f"station={station}"] = (stations == station).astype(float) return df def add_datetime_features(df: pd.DataFrame, col_time: str = "datetime") -> pd.DataFrame: """Create a seasonal feature in the dataset with a cosine function. Parameters ---------- df : pd.DataFrame dataframe no missing values col_time: string Column of the index containing the time index Returns ------- pd.DataFrame dataframe with missing values """ df = df.copy() time = df.index.get_level_values(col_time).to_series() days_in_year = time.dt.year.apply( lambda x: (366 if ((x % 4 == 0) and (x % 100 != 0)) or (x % 400 == 0) else 365) ) ratio = time.dt.dayofyear.values / days_in_year.values df["time_cos"] = np.cos(2 * np.pi * ratio) df["time_sin"] = np.sin(2 * np.pi * ratio) return df def convert_tsf_to_dataframe( full_file_path_and_name: str, replace_missing_vals_with: Union[str, float, int] = "NaN", value_column_name: str = "series_value", ): """Convert a .tsf file to a dataframe. Parameters ---------- full_file_path_and_name : str Filename replace_missing_vals_with : Union[str, float, int], optional Replace missing values with, by default "NaN" value_column_name : str, optional Name of the column containing the values, by default "series_value" Returns ------- _type_ _description_ """ col_names = [] col_types = [] all_data: Dict[str, List] = {} line_count = 0 found_data_tag = False found_data_section = False started_reading_data_section = False with open(full_file_path_and_name, "r", encoding="cp1252") as file: for line in file: line = line.strip() if line: if line.startswith("@"): if not line.startswith("@data"): line_content = line.split(" ") if line.startswith("@attribute"): if len(line_content) != 3: raise Exception("Invalid meta-data specification.") col_names.append(line_content[1]) col_types.append(line_content[2]) else: if len(line_content) != 2: raise Exception("Invalid meta-data specification.") else: if len(col_names) == 0: raise Exception("Attribute section must come before data.") found_data_tag = True elif not line.startswith("#"): if len(col_names) == 0: raise Exception(" Attribute section must come before data.") elif not found_data_tag: raise Exception("Missing @data tag.") else: if not started_reading_data_section: started_reading_data_section = True found_data_section = True all_series = [] for col in col_names: all_data[col] = [] full_info = line.split(":") if len(full_info) != (len(col_names) + 1): raise Exception("Missing attributes/values in series.") series = full_info[len(full_info) - 1] series = series.split(",") # type: ignore if len(series) == 0: raise Exception(" Missing values should be indicated with ? symbol") numeric_series = [] for val in series: if val == "?": numeric_series.append(replace_missing_vals_with) else: numeric_series.append(float(val)) # type: ignore if numeric_series.count(replace_missing_vals_with) == len(numeric_series): raise Exception( "At least one numeric value should be there in a series." ) all_series.append(pd.Series(numeric_series).array) for i in range(len(col_names)): att_val = None if col_types[i] == "numeric": att_val = int(full_info[i]) elif col_types[i] == "string": att_val = str(full_info[i]) # type: ignore elif col_types[i] == "date": att_val = datetime.strptime( full_info[i], "%Y-%m-%d %H-%M-%S", # type: ignore ) else: raise Exception("Invalid attribute type.") if att_val is None: raise Exception("Invalid attribute value.") else: all_data[col_names[i]].append(att_val) line_count = line_count + 1 if line_count == 0: raise Exception("Empty file.") if len(col_names) == 0: raise Exception("Missing attribute section.") if not found_data_section: raise Exception("Missing series information under data section.") all_data[value_column_name] = all_series loaded_data = pd.DataFrame(all_data) return loaded_data