1import signal
2from warnings import warn
3from typing import Any, Callable, List, Optional
4
5import ray
6from ray.types import ObjectRef
7from tqdm import tqdm as tqdm
8
9from . import config
10from ._remote_actor import RemoteLabelledActor
11from ._checkpoint import Checkpoint, CheckpointFailedWarning
12
[docs]
13class Chunker():
14 """A collection of
15 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s
16 pre-loaded with a function to execute that can be reused. Unlike
17 :func:`chunk` this means that the function to parallelise only needs to be
18 copied when the function is first executed in parallel and does not need to
19 be copied to the
20 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s every time.
21
22 Notes
23 -----
24 If ``ray`` is not initialised when :class:`Chunker` is initialised then
25 :class:`Chunker` will initialise ``ray``. In this case when :class:`Chunker`
26 is deleted it will ``shutdown`` ``ray`` unless the attribute
27 :attr:`persistent` is set to ``True``.
28 """
29 actors: List[ObjectRef]
30 """The :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s the
31 :class:`Chunker` will use to execute tasks."""
32 progress_bar: bool
33 "Whether to display a progress bar in the terminal."
34 persistent: bool
35 """Whether to allow the ``ray`` instance (if created by this
36 :class:`Chunker`) to persist after this :class:`Chunker` is deleted."""
37 path: Optional[str]
38 "The path of the checkpointing file associated with this :class:`Chunker`."
[docs]
39 def __init__(self,
40 func: Optional[Callable] = None,
41 actors: Optional[List[ObjectRef]] = None,
42 chunk_size: int = config.default_chunk_size,
43 progress_bar: bool = config.progress_bar,
44 persistent: bool = False,
45 path: Optional[str] = None):
46 """Initialises a :class:`Chunker`.
47
48 Parameters
49 ----------
50 func : Callable, optional
51 The function to be executed in parallel. Note `func` and `actors`
52 cannot be passed together. By default ``None``.
53 actors : List[ObjectRef], optional
54 A list of remote
55 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s.
56 If actors is ``None`` then `func` will be used to generate a set of
57 `chunk_size`
58 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s.
59 Note `func` and `actors` cannot be passed together. By default
60 ``None``.
61 chunk_size : int
62 The number of threads to launch simultaneously (a chunk). By default
63 :attr:`config.default_chunk_size <thread_chunks.config.default_chunk_size>`.
64 progress_bar : bool
65 Whether to display a progress bar in the terminal. By default
66 :attr:`config.progress_bar <thread_chunks.config.progress_bar>`.
67 persistent : bool
68 Whether to allow the ray instance (if created by this
69 :class:`Chunker`) to persist after this :class:`Chunker` is deleted.
70 By default ``False``.
71 path : str, optional
72 The path of the checkpointing file associated with this
73 :class:`Chunker`. By default ``None``.
74
75 Raises
76 ------
77 ValueError
78 "Either `func` or `actors` must be specified but not both."
79 ValueError
80 "`chunk_size` does not agree with the number of `actors`. Note that
81 ``len(actors)`` must equal `chunk_size`."
82 """
83 self._ray_initialized_by_this_chunker: bool = ray.is_initialized()
84 if (func is None) == (actors is None):
85 raise ValueError("Either `func` or `actors` must be specified but not both.")
86 self.actors = actors if actors is not None else [RemoteLabelledActor.remote(func) for _ in range(chunk_size)]
87 if len(self.actors) != chunk_size:
88 raise ValueError("`chunk_size` does not agree with the number of `actors`. Note that ``len(actors)`` must equal `chunk_size`.")
89 self.progress_bar = progress_bar
90 self.persistent = persistent
91 self.path = path
92 if not self._ray_initialized_by_this_chunker:
93 ray.init(**config.ray_config, ignore_reinit_error=True)
94 @property
95 def chunk_size(self) -> int:
96 "The number of threads to launch simultaneously (a chunk)."
97 return len(self.actors)
[docs]
98 def __call__(self,
99 parameters: List[List[Any]],
100 func: Optional[Callable] = None,
101 progress_bar: Optional[bool] = None,
102 path: Optional[str] = None
103 ) -> List[Any]:
104 """The functions stored in each
105 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` in
106 :attr:`actors` is called in parallel, a chunk at a time, for each set of
107 ``*args`` in `parameters` and returns the outputs in an ordered
108 ``list``. As only `chunk_size` threads are running `func` at any given
109 time memory intensive operations will not exhaust the RAM capacity.
110
111 Alternatively, `func` can be passed to override the functions stored in
112 each :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` of
113 :attr:`actors`.
114
115 Parameters
116 ----------
117 parameters : List[List[Any]]
118 A list of ``*args`` to pass to the function to be executed in
119 parallel.
120 func : Callable, optional
121 The function to be executed in parallel. If ``None`` then the
122 function stored in each
123 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>`
124 in :attr:`actors` is used. Else the functions stored in the
125 :attr:`actors` are replaced with `func`. By default ``None``.
126 progress_bar : bool, optional
127 Whether to display a progress bar in the terminal. If ``None`` is
128 passed then :attr:`progress_bar` is used. By default ``None``.
129 path : str, optional
130 The path to save the checkpoints to. By default ``None``.
131
132 Returns
133 -------
134 List[Any]
135 A List of the outputs of `func`. The ``i`` th element corresponds to
136 the ``i`` th element of parameters [``func(*parameters[i])``].
137
138 Warns
139 -----
140 CheckpointFailedWarning
141 The checkpointing file has no saved argument values. Proceeding and
142 continuing to save data, but you will not be able to pick up from
143 this checkpoint.
144 CheckpointFailedWarning
145 There was an error while saving the data.
146 """
147 if progress_bar is None:
148 progress_bar = self.progress_bar
149 if path is None:
150 path = self.path
151 return chunk(func,
152 parameters,
153 self.actors,
154 self.chunk_size,
155 progress_bar,
156 path)
157 def __del__(self):
158 """Shuts down ``ray`` if ``ray`` was initialised by this instance of
159 :class:`Chunker` and ``persistent==False``.
160 """
161 if (hasattr(self, "_ray_initialized_by_this_chunker")
162 and not self._ray_initialized_by_this_chunker
163 and not self.persistent):
164 ray.shutdown()
165
[docs]
166def chunk(func: Optional[Callable],
167 parameters: List[List[Any]],
168 actors: Optional[List[ObjectRef]] = None,
169 chunk_size: int = config.default_chunk_size,
170 progress_bar: bool = config.progress_bar,
171 path: Optional[str] = None
172 ) -> List[Any]:
173 """Calls `func` in parallel, a chunk at a time, for each set of ``*args`` in
174 `parameters` and returns the outputs in an ordered ``list``. As only
175 `chunk_size` threads are running `func` at any given time memory intensive
176 operations will not exhaust the RAM capacity.
177
178 Alternatively, `actors` can be passed a list of
179 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` if `func` is passed
180 ``None`` and the functions stored in each
181 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` will be used.
182
183 Parameters
184 ----------
185 func : Optional[Callable]
186 The function to parallelize. If ``None`` then the function saved in the
187 `actors` is used.
188 parameters : List[List[Any]]
189 A list of ``*args`` to be passed to `func`.
190 actors: Optional[List[ObjectRef]]
191 A list of ray actors of type
192 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` to use for the
193 computation. By default ``None``.
194 chunk_size : int, optional
195 The number of threads to launch simultaneously (a chunk). By default
196 :attr:`config.default_chunk_size <thread_chunks.config.default_chunk_size>`.
197 progress_bar : bool, optional
198 Whether to display a progress bar in the terminal. By default
199 :attr:`config.progress_bar <thread_chunks.config.progress_bar>`.
200 path : str, optional
201 If specified then after each new data point a checkpoint is saved to the
202 `path`. If the checkpoint file exists when :func:`chunk` is called then
203 :func:`chunk` will pick up from the checkpoint.
204
205 Returns
206 -------
207 List[Any]
208 A List of the outputs of `func`. The ith element corresponds to the ith
209 element of parameters [``func(*parameters[i])``].
210
211 Raises
212 ------
213 ValueError
214 "`func` and `actors` cannot both be ``None``."
215 ValueError
216 "`chunk_size` does not agree with the number of `actors`. Note that
217 ``len(actors)`` must equal `chunk_size`."
218
219 Warns
220 -----
221 CheckpointFailedWarning
222 The checkpointing file has no saved argument values. Proceeding and
223 continuing to save data, but you will not be able to pick up from this
224 checkpoint.
225 CheckpointFailedWarning
226 There was an error while saving the data.
227 """
228 if actors is not None and len(actors) != chunk_size:
229 raise ValueError("`chunk_size` does not agree with the number of `actors`. Note that ``len(actors)`` must equal `chunk_size`.")
230 # Initialising ray if it has not yet been initialised.
231 ray_initialized: bool = ray.is_initialized()
232 if not ray_initialized:
233 ray.init(**config.ray_config)
234 try:
235 updated_threads = None
236 if func is not None:
237 func_name = func.__name__
238 # Copying the `func` to the `actors`
239 func_ref = ray.put(func)
240 if actors is None:
241 # Creating the `actors`
242 actors = [RemoteLabelledActor.remote(func_ref)
243 for _ in range(chunk_size)]
244 else:
245 # Updating the functions stored by the actors.
246 updated_threads = [a.set_func.remote(func_ref) for a in actors]
247 else:
248 if actors is None:
249 ValueError("`func` and `actors` cannot both be ``None``.")
250 else:
251 func_name = ray.get(actors[0].get_func.remote()).__name__
252
253 num_params = len(parameters)
254 # Loading checkpoint if it exists
255 checkpoint, loaded_successfully = Checkpoint.loadifparams(func_name,
256 parameters,
257 [None]*num_params,
258 [False]*num_params,
259 index=0,
260 done=0,
261 path=path)
262 index_map = list(range(num_params))
263 if loaded_successfully:
264 print("Picking up from previous checkpoint.")
265 # Create the Boolean mask "`func` has not been executed".
266 mask = [not completed for completed in checkpoint.completed]
267 # Masking the parameters to get only the parameters left to execute
268 # `func` for.
269 parameters = [p for i, p in enumerate(parameters) if mask[i]]
270 # Masking the ``index_map``.
271 index_map = [i for i in index_map if mask[i]]
272
273 # Initialising the index of the next parameters to run `func` for after a
274 # task in the current chunk completes.
275 checkpoint.index = chunk_size
276 # Initialising the current chunk
277 chunk = parameters[:checkpoint.index]
278
279 if progress_bar:
280 pbar = tqdm(total=num_params, initial=checkpoint.done)
281 cout = pbar.write
282 else:
283 cout = print
284
285 if updated_threads is not None:
286 # Waiting for function copy to complete
287 ray.get(updated_threads)
288 # Setting off the initial chunk
289 threads = [actors[i].run.remote((index_map[i], i), *c)
290 for i, c in enumerate(chunk)]
291
292 while threads: # Loops until no threads left computing
293 # Collect all completed threads without waiting.
294 done_threads, threads = ray.wait(threads,
295 num_returns=len(threads),
296 timeout=0)
297 # Collect the outputs of the done threads
298 values = ray.get(done_threads)
299 num_values = len(values)
300 if num_values != 0: # If any threads finished
301 if progress_bar:
302 pbar.set_description(f"Results per iteration: {num_values}")
303 try:
304 if path is not None:
305 # Save data to file.
306 # Block interrupt signals while saving is in progress.
307 signal.pthread_sigmask(signal.SIG_BLOCK,
308 [signal.SIGINT])
309 cout("Interrupt signals have been blocked while the data is saved.")
310 for value in values:
311 # Updating checkpoint
312 checkpoint.output[value[0][0]] = value[1]
313 checkpoint.completed[value[0][0]] = True
314 checkpoint.done += 1
315 # Initialising new threads before saving so that these
316 # can run in the background if the saving is slow.
317 if checkpoint.index < len(parameters):
318 # Getting the index of the actor that completed its
319 # execution.
320 actor_index = value[0][1]
321 # Giving the actor the next task in the list.
322 thread = actors[actor_index].run\
323 .remote((index_map[checkpoint.index], actor_index),
324 *parameters[checkpoint.index])
325 threads.append(thread)
326 checkpoint.index += 1
327 if path is not None:
328 if not checkpoint.update_save():
329 warn("The checkpointing file has no saved argument values. Proceeding and continuing to save data, but you will not be able to pick up from this checkpoint.",
330 CheckpointFailedWarning)
331 if progress_bar:
332 pbar.update(1)
333 except BaseException as e:
334 if path is not None:
335 warn("There was an error while saving the data.",
336 CheckpointFailedWarning)
337 raise e
338 else:
339 if path is not None:
340 cout("Data has saved successfully.")
341 finally:
342 if path is not None:
343 cout("Interrupt signals have been unblocked.")
344 signal.pthread_sigmask(signal.SIG_UNBLOCK,
345 [signal.SIGINT])
346 del done_threads
347 del values
348 del threads
349 del actors
350 finally:
351 # Shutting down ray if this function initialised it.
352 if not ray_initialized:
353 ray.shutdown()
354 return checkpoint.output