p2pfl.management.message_storage moduleΒΆ
Message storage for logging communication events.
- class p2pfl.management.message_storage.MessageStorage(disable_locks=False)[source]ΒΆ
Bases:
objectMessage storage. Stores communication events between nodes.
Format:
[ { "timestamp": datetime.datetime, "source": "source_node", "destination": "dest_node", "direction": "sent"/"received", "cmd": "command_name", "package_type": "message"/"weights", "package_size": size_in_bytes, "round": round_number, "additional_info": {...} or None }, ]
- Parameters:
disable_locks (
bool)
- add_message(node, direction, cmd, source_dest, package_type, package_size, round_num=None, additional_info=None)[source]ΒΆ
Add a message entry to the storage.
- Parameters:
node (
str) β The node address.direction (
str) β Direction of communication (βsentβ or βreceivedβ).cmd (
str) β The command or message type.source_dest (
str) β Source (if receiving) or destination (if sending) node.package_type (
str) β Type of package (βmessageβ or βweightsβ).package_size (
int) β Size of the package in bytes (if available).round_num (
int|None) β The federated learning round number (if applicable).additional_info (
dict[str,Any] |None) β Additional information as a dictionary.
- Return type:
None
- get_messages(node=None, direction=None, cmd=None, round_num=None, limit=None)[source]ΒΆ
Get messages with optional filtering.
- Parameters:
node (
str|None) β Filter by node address (as source or destination) (optional).direction (
str|None) β Filter by direction (βsentβ or βreceivedβ) (optional).cmd (
str|None) β Filter by command type (optional).round_num (
int|None) β Filter by round number (optional).limit (
int|None) β Limit the number of messages returned (optional).
- Return type:
list[dict[str,Any]]- Returns:
A list of message dictionaries matching the filters.
- get_received_messages(node=None, cmd=None, round_num=None, limit=None)[source]ΒΆ
Get received messages with optional filtering.
- Parameters:
node (
str|None) β Filter by destination node address (optional).cmd (
str|None) β Filter by command type (optional).round_num (
int|None) β Filter by round number (optional).limit (
int|None) β Limit the number of messages returned (optional).
- Return type:
list[dict[str,Any]]- Returns:
Received messages matching the filters.
- get_sent_messages(node=None, cmd=None, round_num=None, limit=None)[source]ΒΆ
Get sent messages with optional filtering.
- Parameters:
node (
str|None) β Filter by source node address (optional).cmd (
str|None) β Filter by command type (optional).round_num (
int|None) β Filter by round number (optional).limit (
int|None) β Limit the number of messages returned (optional).
- Return type:
list[dict[str,Any]]- Returns:
Sent messages matching the filters.