Source code for thread_chunks._chunker

  1import signal
  2from warnings import warn
  3from typing import Any, Callable, List, Optional
  4
  5import ray
  6from ray.types import ObjectRef
  7from tqdm import tqdm as tqdm
  8
  9from . import config
 10from ._remote_actor import RemoteLabelledActor
 11from ._checkpoint import Checkpoint, CheckpointFailedWarning
 12
[docs] 13class Chunker(): 14 """A collection of 15 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s 16 pre-loaded with a function to execute that can be reused. Unlike 17 :func:`chunk` this means that the function to parallelise only needs to be 18 copied when the function is first executed in parallel and does not need to 19 be copied to the 20 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s every time. 21 22 Notes 23 ----- 24 If ``ray`` is not initialised when :class:`Chunker` is initialised then 25 :class:`Chunker` will initialise ``ray``. In this case when :class:`Chunker` 26 is deleted it will ``shutdown`` ``ray`` unless the attribute 27 :attr:`persistent` is set to ``True``. 28 """ 29 actors: List[ObjectRef] 30 """The :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s the 31 :class:`Chunker` will use to execute tasks.""" 32 progress_bar: bool 33 "Whether to display a progress bar in the terminal." 34 persistent: bool 35 """Whether to allow the ``ray`` instance (if created by this 36 :class:`Chunker`) to persist after this :class:`Chunker` is deleted.""" 37 path: Optional[str] 38 "The path of the checkpointing file associated with this :class:`Chunker`."
[docs] 39 def __init__(self, 40 func: Optional[Callable] = None, 41 actors: Optional[List[ObjectRef]] = None, 42 chunk_size: int = config.default_chunk_size, 43 progress_bar: bool = config.progress_bar, 44 persistent: bool = False, 45 path: Optional[str] = None): 46 """Initialises a :class:`Chunker`. 47 48 Parameters 49 ---------- 50 func : Callable, optional 51 The function to be executed in parallel. Note `func` and `actors` 52 cannot be passed together. By default ``None``. 53 actors : List[ObjectRef], optional 54 A list of remote 55 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s. 56 If actors is ``None`` then `func` will be used to generate a set of 57 `chunk_size` 58 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` s. 59 Note `func` and `actors` cannot be passed together. By default 60 ``None``. 61 chunk_size : int 62 The number of threads to launch simultaneously (a chunk). By default 63 :attr:`config.default_chunk_size <thread_chunks.config.default_chunk_size>`. 64 progress_bar : bool 65 Whether to display a progress bar in the terminal. By default 66 :attr:`config.progress_bar <thread_chunks.config.progress_bar>`. 67 persistent : bool 68 Whether to allow the ray instance (if created by this 69 :class:`Chunker`) to persist after this :class:`Chunker` is deleted. 70 By default ``False``. 71 path : str, optional 72 The path of the checkpointing file associated with this 73 :class:`Chunker`. By default ``None``. 74 75 Raises 76 ------ 77 ValueError 78 "Either `func` or `actors` must be specified but not both." 79 ValueError 80 "`chunk_size` does not agree with the number of `actors`. Note that 81 ``len(actors)`` must equal `chunk_size`." 82 """ 83 self._ray_initialized_by_this_chunker: bool = ray.is_initialized() 84 if (func is None) == (actors is None): 85 raise ValueError("Either `func` or `actors` must be specified but not both.") 86 self.actors = actors if actors is not None else [RemoteLabelledActor.remote(func) for _ in range(chunk_size)] 87 if len(self.actors) != chunk_size: 88 raise ValueError("`chunk_size` does not agree with the number of `actors`. Note that ``len(actors)`` must equal `chunk_size`.") 89 self.progress_bar = progress_bar 90 self.persistent = persistent 91 self.path = path 92 if not self._ray_initialized_by_this_chunker: 93 ray.init(**config.ray_config, ignore_reinit_error=True)
94 @property 95 def chunk_size(self) -> int: 96 "The number of threads to launch simultaneously (a chunk)." 97 return len(self.actors)
[docs] 98 def __call__(self, 99 parameters: List[List[Any]], 100 func: Optional[Callable] = None, 101 progress_bar: Optional[bool] = None, 102 path: Optional[str] = None 103 ) -> List[Any]: 104 """The functions stored in each 105 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` in 106 :attr:`actors` is called in parallel, a chunk at a time, for each set of 107 ``*args`` in `parameters` and returns the outputs in an ordered 108 ``list``. As only `chunk_size` threads are running `func` at any given 109 time memory intensive operations will not exhaust the RAM capacity. 110 111 Alternatively, `func` can be passed to override the functions stored in 112 each :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` of 113 :attr:`actors`. 114 115 Parameters 116 ---------- 117 parameters : List[List[Any]] 118 A list of ``*args`` to pass to the function to be executed in 119 parallel. 120 func : Callable, optional 121 The function to be executed in parallel. If ``None`` then the 122 function stored in each 123 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` 124 in :attr:`actors` is used. Else the functions stored in the 125 :attr:`actors` are replaced with `func`. By default ``None``. 126 progress_bar : bool, optional 127 Whether to display a progress bar in the terminal. If ``None`` is 128 passed then :attr:`progress_bar` is used. By default ``None``. 129 path : str, optional 130 The path to save the checkpoints to. By default ``None``. 131 132 Returns 133 ------- 134 List[Any] 135 A List of the outputs of `func`. The ``i`` th element corresponds to 136 the ``i`` th element of parameters [``func(*parameters[i])``]. 137 138 Warns 139 ----- 140 CheckpointFailedWarning 141 The checkpointing file has no saved argument values. Proceeding and 142 continuing to save data, but you will not be able to pick up from 143 this checkpoint. 144 CheckpointFailedWarning 145 There was an error while saving the data. 146 """ 147 if progress_bar is None: 148 progress_bar = self.progress_bar 149 if path is None: 150 path = self.path 151 return chunk(func, 152 parameters, 153 self.actors, 154 self.chunk_size, 155 progress_bar, 156 path)
157 def __del__(self): 158 """Shuts down ``ray`` if ``ray`` was initialised by this instance of 159 :class:`Chunker` and ``persistent==False``. 160 """ 161 if (hasattr(self, "_ray_initialized_by_this_chunker") 162 and not self._ray_initialized_by_this_chunker 163 and not self.persistent): 164 ray.shutdown()
165
[docs] 166def chunk(func: Optional[Callable], 167 parameters: List[List[Any]], 168 actors: Optional[List[ObjectRef]] = None, 169 chunk_size: int = config.default_chunk_size, 170 progress_bar: bool = config.progress_bar, 171 path: Optional[str] = None 172 ) -> List[Any]: 173 """Calls `func` in parallel, a chunk at a time, for each set of ``*args`` in 174 `parameters` and returns the outputs in an ordered ``list``. As only 175 `chunk_size` threads are running `func` at any given time memory intensive 176 operations will not exhaust the RAM capacity. 177 178 Alternatively, `actors` can be passed a list of 179 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` if `func` is passed 180 ``None`` and the functions stored in each 181 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` will be used. 182 183 Parameters 184 ---------- 185 func : Optional[Callable] 186 The function to parallelize. If ``None`` then the function saved in the 187 `actors` is used. 188 parameters : List[List[Any]] 189 A list of ``*args`` to be passed to `func`. 190 actors: Optional[List[ObjectRef]] 191 A list of ray actors of type 192 :class:`RemoteLabelledActor <thread_chunks.RemoteLabelledActor>` to use for the 193 computation. By default ``None``. 194 chunk_size : int, optional 195 The number of threads to launch simultaneously (a chunk). By default 196 :attr:`config.default_chunk_size <thread_chunks.config.default_chunk_size>`. 197 progress_bar : bool, optional 198 Whether to display a progress bar in the terminal. By default 199 :attr:`config.progress_bar <thread_chunks.config.progress_bar>`. 200 path : str, optional 201 If specified then after each new data point a checkpoint is saved to the 202 `path`. If the checkpoint file exists when :func:`chunk` is called then 203 :func:`chunk` will pick up from the checkpoint. 204 205 Returns 206 ------- 207 List[Any] 208 A List of the outputs of `func`. The ith element corresponds to the ith 209 element of parameters [``func(*parameters[i])``]. 210 211 Raises 212 ------ 213 ValueError 214 "`func` and `actors` cannot both be ``None``." 215 ValueError 216 "`chunk_size` does not agree with the number of `actors`. Note that 217 ``len(actors)`` must equal `chunk_size`." 218 219 Warns 220 ----- 221 CheckpointFailedWarning 222 The checkpointing file has no saved argument values. Proceeding and 223 continuing to save data, but you will not be able to pick up from this 224 checkpoint. 225 CheckpointFailedWarning 226 There was an error while saving the data. 227 """ 228 if actors is not None and len(actors) != chunk_size: 229 raise ValueError("`chunk_size` does not agree with the number of `actors`. Note that ``len(actors)`` must equal `chunk_size`.") 230 # Initialising ray if it has not yet been initialised. 231 ray_initialized: bool = ray.is_initialized() 232 if not ray_initialized: 233 ray.init(**config.ray_config) 234 try: 235 updated_threads = None 236 if func is not None: 237 func_name = func.__name__ 238 # Copying the `func` to the `actors` 239 func_ref = ray.put(func) 240 if actors is None: 241 # Creating the `actors` 242 actors = [RemoteLabelledActor.remote(func_ref) 243 for _ in range(chunk_size)] 244 else: 245 # Updating the functions stored by the actors. 246 updated_threads = [a.set_func.remote(func_ref) for a in actors] 247 else: 248 if actors is None: 249 ValueError("`func` and `actors` cannot both be ``None``.") 250 else: 251 func_name = ray.get(actors[0].get_func.remote()).__name__ 252 253 num_params = len(parameters) 254 # Loading checkpoint if it exists 255 checkpoint, loaded_successfully = Checkpoint.loadifparams(func_name, 256 parameters, 257 [None]*num_params, 258 [False]*num_params, 259 index=0, 260 done=0, 261 path=path) 262 index_map = list(range(num_params)) 263 if loaded_successfully: 264 print("Picking up from previous checkpoint.") 265 # Create the Boolean mask "`func` has not been executed". 266 mask = [not completed for completed in checkpoint.completed] 267 # Masking the parameters to get only the parameters left to execute 268 # `func` for. 269 parameters = [p for i, p in enumerate(parameters) if mask[i]] 270 # Masking the ``index_map``. 271 index_map = [i for i in index_map if mask[i]] 272 273 # Initialising the index of the next parameters to run `func` for after a 274 # task in the current chunk completes. 275 checkpoint.index = chunk_size 276 # Initialising the current chunk 277 chunk = parameters[:checkpoint.index] 278 279 if progress_bar: 280 pbar = tqdm(total=num_params, initial=checkpoint.done) 281 cout = pbar.write 282 else: 283 cout = print 284 285 if updated_threads is not None: 286 # Waiting for function copy to complete 287 ray.get(updated_threads) 288 # Setting off the initial chunk 289 threads = [actors[i].run.remote((index_map[i], i), *c) 290 for i, c in enumerate(chunk)] 291 292 while threads: # Loops until no threads left computing 293 # Collect all completed threads without waiting. 294 done_threads, threads = ray.wait(threads, 295 num_returns=len(threads), 296 timeout=0) 297 # Collect the outputs of the done threads 298 values = ray.get(done_threads) 299 num_values = len(values) 300 if num_values != 0: # If any threads finished 301 if progress_bar: 302 pbar.set_description(f"Results per iteration: {num_values}") 303 try: 304 if path is not None: 305 # Save data to file. 306 # Block interrupt signals while saving is in progress. 307 signal.pthread_sigmask(signal.SIG_BLOCK, 308 [signal.SIGINT]) 309 cout("Interrupt signals have been blocked while the data is saved.") 310 for value in values: 311 # Updating checkpoint 312 checkpoint.output[value[0][0]] = value[1] 313 checkpoint.completed[value[0][0]] = True 314 checkpoint.done += 1 315 # Initialising new threads before saving so that these 316 # can run in the background if the saving is slow. 317 if checkpoint.index < len(parameters): 318 # Getting the index of the actor that completed its 319 # execution. 320 actor_index = value[0][1] 321 # Giving the actor the next task in the list. 322 thread = actors[actor_index].run\ 323 .remote((index_map[checkpoint.index], actor_index), 324 *parameters[checkpoint.index]) 325 threads.append(thread) 326 checkpoint.index += 1 327 if path is not None: 328 if not checkpoint.update_save(): 329 warn("The checkpointing file has no saved argument values. Proceeding and continuing to save data, but you will not be able to pick up from this checkpoint.", 330 CheckpointFailedWarning) 331 if progress_bar: 332 pbar.update(1) 333 except BaseException as e: 334 if path is not None: 335 warn("There was an error while saving the data.", 336 CheckpointFailedWarning) 337 raise e 338 else: 339 if path is not None: 340 cout("Data has saved successfully.") 341 finally: 342 if path is not None: 343 cout("Interrupt signals have been unblocked.") 344 signal.pthread_sigmask(signal.SIG_UNBLOCK, 345 [signal.SIGINT]) 346 del done_threads 347 del values 348 del threads 349 del actors 350 finally: 351 # Shutting down ray if this function initialised it. 352 if not ray_initialized: 353 ray.shutdown() 354 return checkpoint.output