p2pfl.management.message_storage moduleΒΆ

Message storage for logging communication events.

class p2pfl.management.message_storage.MessageStorage(disable_locks=False)[source]ΒΆ

Bases: object

Message storage. Stores communication events between nodes.

Format:

[
    {
        "timestamp": datetime.datetime,
        "source": "source_node",
        "destination": "dest_node",
        "direction": "sent"/"received",
        "cmd": "command_name",
        "package_type": "message"/"weights",
        "package_size": size_in_bytes,
        "round": round_number,
        "additional_info": {...} or None
    },
]
Parameters:

disable_locks (bool)

add_message(node, direction, cmd, source_dest, package_type, package_size, round_num=None, additional_info=None)[source]ΒΆ

Add a message entry to the storage.

Parameters:
  • node (str) – The node address.

  • direction (str) – Direction of communication (β€œsent” or β€œreceived”).

  • cmd (str) – The command or message type.

  • source_dest (str) – Source (if receiving) or destination (if sending) node.

  • package_type (str) – Type of package (β€œmessage” or β€œweights”).

  • package_size (int) – Size of the package in bytes (if available).

  • round_num (int | None) – The federated learning round number (if applicable).

  • additional_info (dict[str, Any] | None) – Additional information as a dictionary.

Return type:

None

get_messages(node=None, direction=None, cmd=None, round_num=None, limit=None)[source]ΒΆ

Get messages with optional filtering.

Parameters:
  • node (str | None) – Filter by node address (as source or destination) (optional).

  • direction (str | None) – Filter by direction (β€œsent” or β€œreceived”) (optional).

  • cmd (str | None) – Filter by command type (optional).

  • round_num (int | None) – Filter by round number (optional).

  • limit (int | None) – Limit the number of messages returned (optional).

Return type:

list[dict[str, Any]]

Returns:

A list of message dictionaries matching the filters.

get_received_messages(node=None, cmd=None, round_num=None, limit=None)[source]ΒΆ

Get received messages with optional filtering.

Parameters:
  • node (str | None) – Filter by destination node address (optional).

  • cmd (str | None) – Filter by command type (optional).

  • round_num (int | None) – Filter by round number (optional).

  • limit (int | None) – Limit the number of messages returned (optional).

Return type:

list[dict[str, Any]]

Returns:

Received messages matching the filters.

get_sent_messages(node=None, cmd=None, round_num=None, limit=None)[source]ΒΆ

Get sent messages with optional filtering.

Parameters:
  • node (str | None) – Filter by source node address (optional).

  • cmd (str | None) – Filter by command type (optional).

  • round_num (int | None) – Filter by round number (optional).

  • limit (int | None) – Limit the number of messages returned (optional).

Return type:

list[dict[str, Any]]

Returns:

Sent messages matching the filters.