p2pfl.learning.aggregators package

Aggregation algorithms for P2PFL.

Base classes:
  • Aggregator: Abstract base class for all aggregators

  • WeightAggregator: Base class for neural network aggregators

  • TreeAggregator: Base class for tree ensemble aggregators

Standard aggregators:
  • FedAvg: Federated Averaging

  • FedProx: Federated Proximal

  • FedMedian: Federated Median

  • Krum: Byzantine-resilient aggregation

  • Scaffold: SCAFFOLD algorithm

  • FedOptBase, FedAdagrad, FedAdam, FedYogi: Adaptive optimization

  • SequentialLearning: Sequential model passing (any model type)

Tree-based aggregators:
  • FedXgbBagging: XGBoost bagging aggregation

Utility functions:
  • get_default_aggregator: Select default aggregator based on model type

class p2pfl.learning.aggregators.Aggregator(disable_partial_aggregation=False)[source]

Bases: NodeComponent

Abstract base class for all aggregators.

Important

We do not recomend to inherit directly from this class. Instead, inherit from: - WeightAggregator: For neural network aggregation (FedAvg, etc.) - TreeAggregator: For tree ensemble aggregation (FedXgbBagging, etc.)

Parameters:

disable_partial_aggregation (bool) – Whether to disable partial aggregation.

SUPPORTS_PARTIAL_AGGREGATION

Whether partial aggregation is supported.

SUPPORTS_PARTIAL_AGGREGATION: bool = False
add_model(model)[source]

Add a model. The first model to be added starts the run method (timeout).

Parameters:

model (P2PFLModel) – Model to add.

Return type:

list[str]

Returns:

List of contributors.

aggregate(models)[source]

Validate and aggregate the models.

Automatically calls validate_models() before delegating to _aggregate().

Parameters:

models (list[P2PFLModel]) – List of models to aggregate.

Return type:

P2PFLModel

Returns:

The aggregated model.

clear()[source]

Clear the aggregation (remove trainset and release locks).

Return type:

None

get_aggregated_models()[source]

Get the list of aggregated models.

Return type:

list[str]

Returns:

Name of nodes that colaborated to get the model.

get_missing_models()[source]

Obtain missing models for the aggregation.

Return type:

set

Returns:

A set of missing models.

get_model(except_nodes)[source]

Get corresponding aggregation depending if aggregator supports partial aggregations.

Parameters:

except_nodes – List of nodes to exclude from the aggregation.

Return type:

P2PFLModel

get_required_callbacks()[source]

Get the required callbacks for the aggregation.

Return type:

list[str]

Returns:

List of required callbacks.

set_nodes_to_aggregate(nodes_to_aggregate)[source]

List with the name of nodes to aggregate. Be careful, by setting new nodes, the actual aggregation will be lost.

Parameters:

nodes_to_aggregate (list[str]) – List of nodes to aggregate. Empty for no aggregation.

Raises:

RuntimeError – If the aggregation is running.

Return type:

None

validate_models(models)[source]

Validate that all models are compatible with this aggregator.

Parameters:

models (list[P2PFLModel]) – List of models to validate.

Raises:

IncompatibleModelError – If any model is incompatible with this aggregator.

Return type:

None

wait_and_get_aggregation(timeout=300)[source]

Wait for aggregation to finish.

Parameters:

timeout (int) – Timeout in seconds.

Return type:

P2PFLModel

Returns:

Aggregated model.

Raises:

Exception – If waiting for an aggregated model and several models were received.

class p2pfl.learning.aggregators.FedAdagrad(eta=0.1, beta_1=0.9, tau=1e-09, disable_partial_aggregation=False)[source]

Bases: FedOptBase

FedAdagrad - Adaptive Federated Optimization using Adagrad [Reddi et al., 2020].

FedAdagrad adapts the Adagrad optimizer to federated settings, maintaining adaptive learning rates on the server side based on accumulated squared gradients.

Paper: https://arxiv.org/abs/2003.00295

Parameters:
  • eta (float)

  • beta_1 (float)

  • tau (float)

  • disable_partial_aggregation (bool)

class p2pfl.learning.aggregators.FedAdam(eta=0.1, beta_1=0.9, beta_2=0.99, tau=1e-09, disable_partial_aggregation=False)[source]

Bases: FedOptBase

FedAdam - Adaptive Federated Optimization using Adam [Reddi et al., 2020].

FedAdam adapts the Adam optimizer to federated settings, maintaining both momentum and adaptive learning rates on the server side.

Paper: https://arxiv.org/abs/2003.00295

Parameters:
  • eta (float)

  • beta_1 (float)

  • beta_2 (float)

  • tau (float)

  • disable_partial_aggregation (bool)

class p2pfl.learning.aggregators.FedAvg(disable_partial_aggregation=False)[source]

Bases: WeightAggregator

Federated Averaging (FedAvg) [McMahan et al., 2016].

Inherits from WeightAggregator as FedAvg works with neural network weight tensors that can be averaged.

Paper: https://arxiv.org/abs/1602.05629.

Parameters:

disable_partial_aggregation (bool)

SUPPORTS_PARTIAL_AGGREGATION: bool = True
class p2pfl.learning.aggregators.FedMedian(disable_partial_aggregation=False)[source]

Bases: WeightAggregator

Federated Median (FedMedian) [Yin et al., 2018].

Inherits from WeightAggregator as FedMedian works with neural network weight tensors.

Paper: https://arxiv.org/pdf/1803.01498v1.pdf

Parameters:

disable_partial_aggregation (bool)

SUPPORTS_PARTIAL_AGGREGATION: bool = False
class p2pfl.learning.aggregators.FedOptBase(eta=0.1, beta_1=0.9, tau=1e-09, disable_partial_aggregation=False)[source]

Bases: FedAvg

Base class for Federated Optimization (FedOpt) family [Reddi et al., 2020].

Inherits from FedAvg (which inherits from WeightAggregator) as FedOpt algorithms work with neural network weight tensors.

This class extends FedAvg to provide common functionality for adaptive federated optimization algorithms like FedAdagrad, FedAdam, and FedYogi.

Paper: https://arxiv.org/abs/2003.00295

Parameters:
  • eta (float)

  • beta_1 (float)

  • tau (float)

  • disable_partial_aggregation (bool)

SUPPORTS_PARTIAL_AGGREGATION: bool = False
class p2pfl.learning.aggregators.FedProx(proximal_mu=0.01, disable_partial_aggregation=False)[source]

Bases: FedAvg

FedProx - Federated Proximal [Li et al., 2018].

Inherits from FedAvg (which inherits from WeightAggregator) as FedProx works with neural network weight tensors.

FedProx extends FedAvg by adding a proximal term to the local objective function to handle system and statistical heterogeneity.

Note

Requires the fedprox callback (e.g., FedProxCallback for PyTorch).

  • Aggregator passes proximal_mu via additional_info

  • Callback snapshots model params at training start as the global reference

  • During training, callback adds proximal penalty: μ × (w - w_global)

Paper: https://arxiv.org/abs/1812.06127

Parameters:
  • proximal_mu (float)

  • disable_partial_aggregation (bool)

get_required_callbacks()[source]

Get the required callbacks for FedProx.

Return type:

list[str]

Returns:

List of required callbacks.

class p2pfl.learning.aggregators.FedXgbBagging(disable_partial_aggregation=False)[source]

Bases: TreeAggregator

Federated XGBoost Bagging Aggregator.

Inherits from TreeAggregator as this aggregator works with tree-based models (XGBoost) using bagging aggregation.

Implements bagging-based aggregation for XGBoost models in federated learning. Trees from different clients are combined into a single ensemble.

SUPPORTS_PARTIAL_AGGREGATION

Whether partial aggregation is supported.

Type:

bool

Parameters:

disable_partial_aggregation (bool)

SUPPORTS_PARTIAL_AGGREGATION: bool = True
class p2pfl.learning.aggregators.FedYogi(eta=0.01, beta_1=0.9, beta_2=0.99, tau=0.001, disable_partial_aggregation=False)[source]

Bases: FedOptBase

FedYogi - Adaptive Federated Optimization using Yogi [Reddi et al., 2020].

FedYogi adapts the Yogi optimizer to federated settings, maintaining adaptive learning rates on the server side to handle heterogeneous data distributions.

Paper: https://arxiv.org/abs/2003.00295

Parameters:
  • eta (float)

  • beta_1 (float)

  • beta_2 (float)

  • tau (float)

  • disable_partial_aggregation (bool)

exception p2pfl.learning.aggregators.IncompatibleModelError[source]

Bases: Exception

Exception raised when a model type is incompatible with the aggregator.

class p2pfl.learning.aggregators.Krum(disable_partial_aggregation=False)[source]

Bases: WeightAggregator

Krum [Blanchard et al., 2017].

Inherits from WeightAggregator as Krum works with neural network weight tensors.

Paper: https://arxiv.org/pdf/1703.02757

Parameters:

disable_partial_aggregation (bool)

SUPPORTS_PARTIAL_AGGREGATION: bool = False
exception p2pfl.learning.aggregators.NoModelsToAggregateError[source]

Bases: Exception

Exception raised when there are no models to aggregate.

class p2pfl.learning.aggregators.Scaffold(global_lr=0.1, disable_partial_aggregation=False)[source]

Bases: WeightAggregator

SCAFFOLD Aggregator [Karimireddy et al., 2019].

Inherits from WeightAggregator as SCAFFOLD works with neural network weight tensors.

Paper: https://arxiv.org/pdf/1910.06378

The aggregator acts like the server in centralized learning, handling both model and control variate updates.

Due to the complete decentralization of the environment, a global model is also maintained in the aggregator. This consumes additional bandwidth.

Note

This aggregator requires the scaffold callback to be registered. The callback computes delta_y_i (model update) and delta_c_i (control variate update) which are passed via additional_info. See ScaffoldCallback for PyTorch/TensorFlow implementations.

Todo

Improve efficiency by sharing the global model only each n rounds.

Parameters:
  • global_lr (float)

  • disable_partial_aggregation (bool)

REQUIRED_INFO_KEYS = ['delta_y_i', 'delta_c_i']
SUPPORTS_PARTIAL_AGGREGATION: bool = False
get_required_callbacks()[source]

Retrieve the list of required callback keys for this aggregator.

Return type:

list[str]

class p2pfl.learning.aggregators.SequentialLearning(disable_partial_aggregation=False)[source]

Bases: Aggregator

Sequential Learning Aggregator - passes a single model through unchanged.

In sequential learning (also known as cyclic learning), only one client participates per round and the model is passed sequentially between clients. This aggregator simply passes through the received model without modification.

Use cases:
  • Cyclic federated learning where clients train one after another

  • Ring topologies where the model circulates through all nodes

  • Any scenario requiring pass-through aggregation without modification

Unlike WeightAggregator or TreeAggregator, this aggregator accepts any model type (neural networks, tree ensembles, etc.) since it performs no actual aggregation - just model forwarding.

Note

This aggregator expects exactly one model per aggregation round. Passing multiple models will raise a ValueError.

Example

>>> aggregator = SequentialLearning()
>>> aggregator.set_addr("node1")
>>> result = aggregator.aggregate([single_model])
Parameters:

disable_partial_aggregation (bool)

SUPPORTS_PARTIAL_AGGREGATION: bool = True
class p2pfl.learning.aggregators.TreeAggregator(disable_partial_aggregation=False)[source]

Bases: Aggregator

Base class for aggregators that work with tree ensemble models.

Inherit from this class for aggregators that:
  • Combine trees via bagging, boosting, or cycling

  • Work with XGBoost models

  • Expect serialized tree structures

The validation is automatic via the template pattern: aggregate() calls validate_models() before delegating to _aggregate().

Example

>>> class MyTreeAggregator(TreeAggregator):
...     def _aggregate(self, models):
...         # Validation already done by aggregate()
...         # ... your tree combination logic
...         pass
Parameters:

disable_partial_aggregation (bool)

class p2pfl.learning.aggregators.WeightAggregator(disable_partial_aggregation=False)[source]

Bases: Aggregator

Base class for aggregators that work with neural network models.

Inherit from this class for aggregators that:
  • Average or combine weight tensors

  • Work with PyTorch, TensorFlow, Flax models

  • Expect list[np.ndarray] of float32/float64 parameter arrays

The validation is automatic via the template pattern: aggregate() calls validate_models() before delegating to _aggregate().

Example

>>> class MyAggregator(WeightAggregator):
...     def _aggregate(self, models):
...         # Validation already done by aggregate()
...         # ... your averaging logic
...         pass
Parameters:

disable_partial_aggregation (bool)

p2pfl.learning.aggregators.get_default_aggregator(model)[source]

Select the appropriate default aggregator based on model type.

Parameters:

model (P2PFLModel) – The model to determine aggregator for.

Return type:

WeightAggregator | TreeAggregator

Returns:

FedXgbBagging for tree-based models, FedAvg for weight-based models.

Subpackages

Submodules