p2pfl.management.message_storage moduleΒΆ

Message storage for logging communication events.

class p2pfl.management.message_storage.MessageStorage(disable_locks=False)[source]ΒΆ

Bases: object

Message storage. Stores communication events between nodes.

Format:

add_message(node, direction, cmd, source_dest, package_type, package_size, round_num=None, additional_info=None)[source]ΒΆ

Add a message entry to the storage.

Parameters:
  • node (str) – The node address.

  • direction (str) – Direction of communication (β€œsent” or β€œreceived”).

  • cmd (str) – The command or message type.

  • source_dest (str) – Source (if receiving) or destination (if sending) node.

  • package_type (str) – Type of package (β€œmessage” or β€œweights”).

  • package_size (int) – Size of the package in bytes (if available).

  • round_num (Optional[int]) – The federated learning round number (if applicable).

  • additional_info (Optional[Dict[str, Any]]) – Additional information as a dictionary.

Return type:

None

get_messages(node=None, direction=None, cmd=None, round_num=None, limit=None)[source]ΒΆ

Get messages with optional filtering.

Parameters:
  • node (Optional[str]) – Filter by node address (as source or destination) (optional).

  • direction (Optional[str]) – Filter by direction (β€œsent” or β€œreceived”) (optional).

  • cmd (Optional[str]) – Filter by command type (optional).

  • round_num (Optional[int]) – Filter by round number (optional).

  • limit (Optional[int]) – Limit the number of messages returned (optional).

Return type:

List[dict[str, Any]]

Returns:

A list of message dictionaries matching the filters.

get_received_messages(node=None, cmd=None, round_num=None, limit=None)[source]ΒΆ

Get received messages with optional filtering.

Parameters:
  • node (Optional[str]) – Filter by destination node address (optional).

  • cmd (Optional[str]) – Filter by command type (optional).

  • round_num (Optional[int]) – Filter by round number (optional).

  • limit (Optional[int]) – Limit the number of messages returned (optional).

Return type:

List[dict[str, Any]]

Returns:

Received messages matching the filters.

get_sent_messages(node=None, cmd=None, round_num=None, limit=None)[source]ΒΆ

Get sent messages with optional filtering.

Parameters:
  • node (Optional[str]) – Filter by source node address (optional).

  • cmd (Optional[str]) – Filter by command type (optional).

  • round_num (Optional[int]) – Filter by round number (optional).

  • limit (Optional[int]) – Limit the number of messages returned (optional).

Return type:

List[dict[str, Any]]

Returns:

Sent messages matching the filters.