#
# This file is part of the federated_learning_p2p (p2pfl) distribution (see https://github.com/pguijas/p2pfl).
# Copyright (c) 2022 Pedro Guijas Bravo.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""P2PFL dataset abstraction."""
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Sequence, Type, Union
import pandas as pd # type: ignore
from datasets import Dataset, DatasetDict, load_dataset # type: ignore
from p2pfl.learning.dataset.partition_strategies import DataPartitionStrategy
# Define the DataFiles type for clarity
DataFilesType = Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]]
# METER EN TESTS TANTO LA CARGA POR SPLITS COMO LA CARGA DE UN SOLO DATASET
[docs]
class DataExportStrategy(ABC):
"""Abstract base class for export strategies."""
[docs]
@staticmethod
@abstractmethod
def export(data: Dataset, transforms: Optional[Callable] = None, **kwargs) -> Any:
"""
Export the data using the specific strategy.
Args:
data: The data to export.
transforms: The transforms to apply to the data.
**kwargs: Additional keyword arguments for the export strategy.
Return:
The exported data.
"""
pass
[docs]
class P2PFLDataset:
"""
Handle various data sources for Peer-to-Peer Federated Learning (P2PFL).
This class uses Hugging Face's `datasets.Dataset` as the intermediate representation for its flexibility and
optimizations.
Supported data sources:
- CSV files
- JSON files
- Parquet files
- Python dictionaries
- Python lists
- Pandas DataFrames
- Hugging Face datasets
- SQL databases
To load different data sources, it is recommended to directly instantiate the `datasets.Dataset` object
and pass it to the `P2PFLDataset` constructor.
Example:
Load data from various sources and create a `P2PFLDataset` object:
.. code-block:: python
from datasets import load_dataset, DatasetDict, concatenate_datasets
# Load data from a CSV file
dataset_csv = load_dataset("csv", data_files="data.csv")
# Load from the Hub
dataset_hub = load_dataset("squad", split="train")
# Create the final dataset object
p2pfl_dataset = P2PFLDataset(
DatasetDict({
"train": concatenate_datasets([dataset_csv, dataset_hub]),
"test": dataset_json
})
)
.. todo::
Add more complex integrations (databricks, etc.)
"""
def __init__(
self,
data: Union[Dataset, DatasetDict],
train_split_name: str = "train",
test_split_name: str = "test",
transforms: Optional[Callable] = None,
):
"""
Initialize the P2PFLDataset object.
Args:
data: The dataset to use.
train_split_name: The name of the training split.
test_split_name: The name of the test split.
transforms: The transforms to apply to the data.
"""
self._data = data
self._train_split_name = train_split_name
self._test_split_name = test_split_name
self._transforms = transforms
[docs]
def get(self, idx, train: bool = True) -> Dict[str, Any]:
"""
Get the item at the given index.
Args:
idx: The index of the item to retrieve.
train: If True, get the item from the training split. Otherwise, get the item from the test split.
Returns:
The item at the given index.
"""
if isinstance(self._data, Dataset):
data = self._data[idx]
elif isinstance(self._data, DatasetDict):
split = self._train_split_name if train else self._test_split_name
data = self._data[split][idx]
return data
[docs]
def generate_train_test_split(self, test_size: float = 0.2, seed: int = 42, shuffle: bool = True, **kwargs) -> None:
"""
Generate a train/test split of the dataset.
Args:
test_size: The proportion of the dataset to include in the test split.
seed: The random seed to use for reproducibility.
shuffle: Whether to shuffle the data before splitting.
**kwargs: Additional keyword arguments to pass to the train_test_split method.
"""
if isinstance(self._data, Dataset):
self._data = self._data.train_test_split()
else:
raise ValueError("Unsupported data type.")
[docs]
def get_num_samples(self, train: bool = True) -> int:
"""
Get the number of samples in the dataset.
Args:
train: If True, get the number of samples in the training split. Otherwise, get the number of samples in the test split.
Returns:
The number of samples in the dataset.
"""
if isinstance(self._data, Dataset):
return len(self._data)
elif isinstance(self._data, DatasetDict):
split = self._train_split_name if train else self._test_split_name
return len(self._data[split])
else:
raise TypeError("Unsupported data type.")
[docs]
def generate_partitions(
self, num_partitions: int, strategy: DataPartitionStrategy, seed: int = 666, label_tag: str = "label"
) -> List["P2PFLDataset"]:
"""
Generate partitions of the dataset.
Args:
num_partitions: The number of partitions to generate.
strategy: The partition strategy to use.
seed: The random seed to use for reproducibility.
label_tag: The tag to use for the label.
Returns:
An iterable of P2PFLDataset objects.
"""
if isinstance(self._data, Dataset):
raise ValueError("Cannot generate partitions for single datasets. ")
train_partition_idxs, test_partition_idxs = strategy.generate_partitions(
self._data[self._train_split_name],
self._data[self._test_split_name],
num_partitions,
seed=seed,
label_tag=label_tag,
)
return [
P2PFLDataset(
DatasetDict(
{
self._train_split_name: self._data[self._train_split_name].select(train_partition_idxs[i]),
self._test_split_name: self._data[self._test_split_name].select(test_partition_idxs[i]),
}
)
)
for i in range(num_partitions)
]
[docs]
def export(
self,
strategy: Type[DataExportStrategy],
train: bool = True,
**kwargs,
) -> Any:
"""
Export the dataset using the given strategy.
Args:
strategy: The export strategy to use.
train: If True, export the training data. Otherwise, export the test data.
**kwargs: Additional keyword arguments for the export strategy.
Returns:
The exported data.
"""
# Checks
if isinstance(self._data, Dataset):
raise ValueError("Cannot export single datasets. Need to generate train/test splits first.")
# Export
split = self._train_split_name if train else self._test_split_name
return strategy.export(self._data[split], transforms=self._transforms, **kwargs)
[docs]
@classmethod
def from_csv(cls, data_files: DataFilesType, **kwargs) -> "P2PFLDataset":
"""
Create a P2PFLDataset from a CSV file.
Args:
data_files: The path to the CSV file or a list of paths to CSV files.
**kwargs: Keyword arguments to pass to datasets.load_dataset.
Return:
A P2PFLDataset object.
"""
dataset = load_dataset("csv", data_files=data_files, **kwargs)
return cls(dataset)
[docs]
@classmethod
def from_json(cls, data_files: DataFilesType, **kwargs) -> "P2PFLDataset":
"""
Create a P2PFLDataset from a JSON file.
Args:
data_files: The path to the JSON file or a list of paths to JSON files.
**kwargs: Keyword arguments to pass to datasets.load_dataset.
Return:
A P2PFLDataset object.
"""
dataset = load_dataset("json", data_files=data_files, **kwargs)
return cls(dataset)
[docs]
@classmethod
def from_parquet(cls, data_files: DataFilesType, **kwargs) -> "P2PFLDataset":
"""
Create a P2PFLDataset from a Parquet file or files.
Args:
data_files: The path to the Parquet file or a list of paths to Parquet files.
**kwargs: Keyword arguments to pass to datasets.load_dataset.
Return:
A P2PFLDataset object.
"""
dataset = load_dataset("parquet", data_files=data_files, **kwargs)
return cls(dataset)
[docs]
@classmethod
def from_pandas(cls, df: pd.DataFrame) -> "P2PFLDataset":
"""
Create a P2PFLDataset from a Pandas DataFrame.
Args:
df: A Pandas DataFrame containing the data.
Returns:
A P2PFLDataset object.
"""
dataset = Dataset.from_pandas(df)
return cls(dataset)
[docs]
@classmethod
def from_huggingface(cls, dataset_name: str, **kwargs) -> "P2PFLDataset":
"""
Create a P2PFLDataset from a Hugging Face dataset.
Args:
dataset_name: The name of the Hugging Face dataset.
**kwargs: Keyword arguments to pass to datasets.load_dataset.
Returns:
A P2PFLDataset object.
"""
dataset = load_dataset(dataset_name, **kwargs)
return cls(dataset)
[docs]
@classmethod
def from_generator(cls, generator: Callable[[], Iterable[Dict[str, Any]]]) -> "P2PFLDataset":
"""
Create a P2PFLDataset from a generator function.
Args:
generator: A generator function that yields dictionaries.
Returns:
A P2PFLDataset object.
"""
dataset = Dataset.from_generator(generator)
return cls(dataset)