Actors¶
What are actors?¶
thread-chunks utilises ray actors to execute functions in parallel. When chunk() is called with a function f and a chunk of size chunk_size then f is copied to chunk_size actors. Actors are remote python classes. The class used by thread-chunks is LabelledActor and the remote version (RemoteLabelledActor) is defined as
RemoteLabelledActor = ray.remote(LabelledActor)
Reusing actors¶
Every time we call chunk() new actors are initialised and f is copied to these new actors. If f takes up a lot of memory this copy may be slow and it makes sense to reuse these actors. To solve this problem we can initialise an instance of Chunker
from thread_chunks import Chunker
chunker = Chunker(f, chunk_size=chunk_size)
which generates chunk_size actors and pre-loads them with the function f. We can then call
output = chunker(parameters)
as many times as we like without copying f. The output is equivalent to
output = chunk(f, parameters, chunk_size=chunk_size)
If we call
output = chunker(parameters, g)
the function stored in the actors will be replaced with g which requires g to be copied to all the actors. However, subsequent calls of
output = chunker(parameters)
will then execute g, as g is now the saved function in the actors.
The actors can also be extracted from chunker and passed to chunk as follows:
actors = chunker.actors
output = chunk(None, parameters, actors, chunk_size=chunk_size)
Note that now chunk() will use the function stored in the actors.
In a similar manner to before, if we pass a new function h and actors to chunk() then the function stored in the actors will be overwritten. That is
output = chunk(h, parameters, actors, chunk_size=chunk_size)
will execute h in parallel using the passed actors
Ray initialisations¶
In a similar vein to reusing actors we can also reuse ray initialisations to decrease wall clock times. If ray is not initialised when chunk() is called, then chunk() will initialise ray and shutdown ray when it has completed. However, if ray was initialised before calling chunk() then chunk() will not shutdown ray when it has completed. Similarly, when an instance of Chunker is initialised ray will be initialised if it has not already been. When the instance of Chunker is deleted then ray will be shutdown if ray was initialised by the instance of Chunker. However, if the argument persistent is passed as True when initialising Chunker, then ray will not be shutdown when the instance is deleted. Finally, if different ray options to config.ray_config are desired then ray should be initialised before using chunk() or Chunker.
Custom actors¶
Finally, thread-chunks can be extended by creating a custom remote actor that inherits LabelledActor:
@ray.remote
class CustomRemoteLabelledActor(LabelledActor):
...
We can then pass these custom actors to chunk():
actors = [CustomRemoteLabelledActor.remote(f) for _ in range(chunk_size)]
output = chunk(None, parameters, actors, chunk_size=chunk_size)
or to Chunker:
chunker = Chunker(actors=actors, chunk_size=chunk_size)
Note that the function f is pre-loaded into the actors. We can also change the pre-loaded function to another function, say g, as follows:
output = chunk(g, parameters, actors, chunk_size=chunk_size)
Now you know everything you need to go and use thread-chunks!