p2pfl.learning.frameworks.simulation.actor_pool moduleΒΆ
Actor pool for distributed computing using Ray.
- class p2pfl.learning.frameworks.simulation.actor_pool.SuperActorPool(resources=None, actor_list=None)[source]ΒΆ
Bases:
ActorPool
SuperActorPool extends ActorPool to manage a pool of VirtualLearnerActor instances for asynchronous distributed computing using Ray.
- _instanceΒΆ
Singleton instance of SuperActorPool.
- Type:
- _lockΒΆ
Lock for thread-safe instance creation.
- Type:
threading.Lock
- resourcesΒΆ
Resources for actor creation.
- Type:
dict
- _addr_to_futureΒΆ
Mapping from actor address to future information.
- Type:
dict
- actor_to_removeΒΆ
Set of actor IDs scheduled for removal.
- Type:
set
- num_actorsΒΆ
Number of active actors in the pool.
- Type:
int
- lockΒΆ
Reentrant lock for thread-safe operations.
- Type:
threading.RLock
- initializedΒΆ
Flag indicating initialization status.
- Type:
bool
- add_actor(num_actors)[source]ΒΆ
Add a specified number of actors to the pool.
- Parameters:
num_actors (
int
) β Number of actors to add.- Return type:
None
- create_actor()[source]ΒΆ
Create a new VirtualLearnerActor instance using provided resources.
- Return type:
ActorClass(VirtualLearnerActor)
- Returns:
New actor instance.
- get_learner_result(addr, timeout)[source]ΒΆ
Retrieve the learner result associated with the given address.
- Parameters:
addr (
str
) β Address of the learner result to retrieve.timeout (
Optional
[float
]) β Timeout for retrieving the result. Defaults to None.
- Return type:
Tuple
[Any
,Any
]- Returns:
Address and result of the learner job.
- process_unordered_future(timeout=None)[source]ΒΆ
Process the next unordered future result from the pool.
- Parameters:
timeout (
Optional
[float
]) β Timeout for processing the future. Defaults to None.- Raises:
StopIteration β If no more results are available.
TimeoutError β If the future processing times out.
- Return type:
None
- submit(fn, value)[source]ΒΆ
Submit a task to an idle actor in the pool.
- Parameters:
fn (
Any
) β Function to be executed by the actor.value (
Tuple
[str
,Learner
]) β Tuple containing address and learner information.
- Return type:
None
- submit_learner_job(actor_fn, job)[source]ΒΆ
Submit a learner job to the pool, handling pending submits if no idle actors are available.
- Parameters:
actor_fn (
Any
) β Function to be executed by the actor.job (
Tuple
[str
,Learner
]) β Tuple containing address and learner information.
- Return type:
None