p2pfl.communication.protocols.protobuff.protobuff_communication_protocol moduleΒΆ
GRPC communication protocol.
- class p2pfl.communication.protocols.protobuff.protobuff_communication_protocol.ProtobuffCommunicationProtocol(commands=None)[source]ΒΆ
Bases:
CommunicationProtocolProtobuff communication protocol.
- Parameters:
addr β Address of the node.
commands (
list[Command] |None) β Commands to add to the communication protocol.
Todo
Decouple the heeartbeat command.
- broadcast(msg, node_list=None)[source]ΒΆ
Broadcast a message to all neighbors.
- Parameters:
msg (
RootMessage) β The message to broadcast.node_list (
list[str] |None) β Optional node list.
- Return type:
None
- build_msg(cmd, args=None, round=None, direct=False)[source]ΒΆ
Build a RootMessage to send to the neighbors.
- Parameters:
cmd (
str) β Command of the message.args (
list[str] |None) β Arguments of the message.round (
int|None) β Round of the message.direct (
bool) β If direct message.
- Return type:
RootMessage- Returns:
RootMessage to send.
- build_weights(cmd, round, serialized_model, contributors=None, weight=1)[source]ΒΆ
Build a RootMessage with a Weights payload to send to the neighbors.
- Parameters:
cmd (
str) β Command of the message.round (
int) β Round of the message.serialized_model (
bytes) β Serialized model to send.contributors (
list[str] |None) β List of contributors.weight (
int) β Weight of the message (number of samples).
- Return type:
RootMessage- Returns:
RootMessage to send.
- connect(addr, non_direct=False)[source]ΒΆ
Connect to a neighbor.
- Parameters:
addr (
str) β The address to connect to.non_direct (
bool) β The non direct flag.
- Return type:
bool
- disconnect(nei, disconnect_msg=True)[source]ΒΆ
Disconnect from a neighbor.
- Parameters:
nei (
str) β The neighbor to disconnect from.disconnect_msg (
bool) β The disconnect message flag.
- Return type:
None
- get_neighbors(only_direct=False)[source]ΒΆ
Get the neighbors.
- Parameters:
only_direct (
bool) β The only direct flag.- Return type:
dict[str,Any]
- gossip_weights(early_stopping_fn, get_candidates_fn, status_fn, model_fn, period=None, create_connection=False)[source]ΒΆ
Gossip model weights.
- Parameters:
early_stopping_fn (
Callable[[],bool]) β The early stopping function.get_candidates_fn (
Callable[[],list[str]]) β The get candidates function.status_fn (
Callable[[],Any]) β The status function.model_fn (
Callable[[str],tuple[Any,str,int,list[str]]]) β The model function.period (
float|None) β The period.create_connection (
bool) β The create connection flag.
- Return type:
None
- send(nei, msg, raise_error=False, remove_on_error=True)[source]ΒΆ
Send a message to a neighbor.
- Parameters:
nei (
str) β The neighbor to send the message.msg (
RootMessage) β The message to send.raise_error (
bool) β If raise error.remove_on_error (
bool) β If remove on error.
- Return type:
None