Created
May 4, 2023 05:11
-
-
Save maharjun/cae3431174dcda176776120e7b0f7cf5 to your computer and use it in GitHub Desktop.
SCOOP Utilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A bunch of utilities that prove useful with the SCOOP Parallelization framework""" | |
############################################################################### | |
# BSD 3-Clause License | |
# | |
# Copyright (c) 2023, maharjun | |
# | |
# Redistribution and use in source and binary forms, with or without | |
# modification, are permitted provided that the following conditions are met: | |
# | |
# 1. Redistributions of source code must retain the above copyright notice, this | |
# list of conditions and the following disclaimer. | |
# | |
# 2. Redistributions in binary form must reproduce the above copyright notice, | |
# this list of conditions and the following disclaimer in the documentation | |
# and/or other materials provided with the distribution. | |
# | |
# 3. Neither the name of the copyright holder nor the names of its | |
# contributors may be used to endorse or promote products derived from | |
# this software without specific prior written permission. | |
# | |
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | |
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
############################################################################### | |
import socket | |
import os | |
import time | |
from typing import Callable | |
import numpy as np | |
from uuid import uuid4 | |
from scoop import futures | |
import scoop | |
# This file can be taken from the Numpy utils gist in https://gist.github.com/maharjun | |
from utils.numpyutils import almost_equal_split | |
# Set this in-case scoop is losing workers due to slow network setup times | |
# scoop.TIME_BEFORE_LOSING_WORKER = <higher-value> | |
def scoop_blank_function(): | |
pass | |
class ScoopPaddedUniqueFunction: | |
""" | |
A class that ensures the execution of a unique function on a | |
SCOOP parallelized environment with a padding time to distribute the jobs. | |
Attributes | |
---------- | |
function : callable | |
The function to be executed. | |
unique_name : str | |
The unique identifier for this instance of the class. | |
pad_atleast : float | |
The padding time in seconds before the next job can be executed. | |
""" | |
__jobs_run__ = set() | |
def __init__(self, function, unique_name, pad_atleast=5.): | |
""" | |
Parameters | |
---------- | |
function : callable | |
The function to be executed. | |
unique_name : str | |
The unique identifier for this instance of the class. | |
pad_atleast : float, optional | |
The padding time in seconds before the next job can be executed, by | |
default 5.0. The padding time helps to ensure that the jobs are | |
properly distributed across the different workers. Needs to be | |
reasonable enough so that the jobs can be sent everywhere before the | |
first job finishes. depends on parallelism setup which is why this | |
parameter is adjustable. | |
""" | |
self.pad_atleast = pad_atleast | |
self.function = function | |
self.unique_name = unique_name | |
def __call__(self, dummy): | |
""" | |
Executes the function and ensures the padding time before | |
the next job can be executed. This method also guards against | |
re-execution of the same function on the same worker. | |
Parameters | |
---------- | |
dummy : Any | |
A dummy argument that is not used in the function but is required | |
because the SCOOP framework expects the mapped functions to have | |
at least one argument. | |
Returns | |
------- | |
tuple | |
A tuple containing the worker's hostname and process ID. | |
""" | |
if self.unique_name in ScoopPaddedUniqueFunction.__jobs_run__: | |
time.sleep(self.pad_atleast) | |
else: | |
init_time = time.time() | |
self.function() | |
time_elapsed = time.time() - init_time | |
if time_elapsed < self.pad_atleast: | |
time.sleep(self.pad_atleast - time_elapsed) | |
ScoopPaddedUniqueFunction.__jobs_run__.union({self.unique_name}) | |
return (socket.gethostname(), os.getpid()) | |
def scoop_exactly_once_on_all_workers(function, init_estimate_n_workers=10, holdtime=5.): | |
""" | |
Ensures that the given function is executed exactly once on all workers | |
in a SCOOP parallelized environment. | |
Parameters | |
---------- | |
function : callable | |
The function to be executed on all workers. | |
init_estimate_n_workers : int, optional | |
The initial estimated number of workers, by default 10. | |
holdtime : float, optional | |
The padding time in seconds before the next job can be executed, | |
by default 5.0. | |
Returns | |
------- | |
set | |
A set of tuples containing the workers' hostnames and process IDs. | |
""" | |
if not scoop.IS_RUNNING: | |
return set((socket.gethostname(), os.getpid())) | |
assert scoop.IS_ORIGIN, "The get_scoop_n_workers can only be run from the origin" | |
wrapped_func = ScoopPaddedUniqueFunction(function, str(uuid4()), pad_atleast=holdtime) | |
is_verfied_count = False | |
previous_estimate = 1 | |
while not is_verfied_count: | |
curr_n_workers = max(previous_estimate, init_estimate_n_workers) | |
map_results = [] | |
map_results_set = set() | |
while len(map_results) == len(map_results_set): | |
curr_n_workers += (curr_n_workers + 1) // 2 | |
map_results = list(futures.map(wrapped_func, range(curr_n_workers))) | |
map_results_set = set(map_results) | |
if len(map_results_set) == previous_estimate: | |
return map_results_set | |
else: | |
assert len(map_results_set) > previous_estimate, "Nodes lost during estimate!!! this shouldn't happen" | |
previous_estimate = len(map_results_set) | |
def get_n_scoop_workers(init_estimate=10, holdtime=5.): | |
""" | |
Parameters | |
---------- | |
runs a dummy task on all workers to try to find | |
init_estimate: int | |
initial estimate of the number of available workers. it is best | |
if it's just bigger than the number of available workers. | |
holdtime: float | |
The holdtime of the dummy task used to probe the workers in | |
seconds. needs to be reasonable enough so that the jobs can be | |
sent everywhere before the first job finishes. depends on | |
parallelism setup which is why this parameter is adjustable. | |
""" | |
if not scoop.IS_RUNNING: | |
return 1 | |
assert scoop.IS_ORIGIN, "The get_scoop_n_workers can only be run from the origin" | |
n_scoop_workers = len(scoop_exactly_once_on_all_workers(scoop_blank_function, init_estimate, holdtime)) | |
return n_scoop_workers | |
class BatchParallelizeScoop: | |
""" | |
Takes a function `function` that takes a batched input (with all but the last | |
dimension being the batch dimensions). When calling `__call__`, this class | |
splits the input_array into `n_scoop_workers` sub-arrays and parallelizes the | |
computation of `function` over them | |
Parameters | |
---------- | |
n_scoop_workers : int | |
The number of scoop workers (see ~get_n_scoop_workers to evaluate the number of | |
available scoop workers) | |
function : Callable | |
A function that takes as the first argument a numpy ndarray where all but the | |
last dimension of the array represent batch dimensions | |
*args, **kwargs | |
Additional positional and keyword arguments to be evaluated by the function | |
Methods | |
------- | |
__call__(input_array): | |
THe logic is as follows. flatten all but the last dimension of input_array, | |
split this array into `n_scoop_workers` chunks, with each chunk being almost | |
equal in size. Run the function `function` on each of these chunks and | |
reassemble the output to the original batch shape | |
""" | |
def __init__(self, n_scoop_workers: int, function: Callable, *args, **kwargs): | |
from functools import partial | |
self.func_partial = partial(function, *args, **kwargs) | |
self.n_scoop_workers = n_scoop_workers | |
@staticmethod | |
def _concatenate_result(result_list): | |
if isinstance(result_list[0], np.ndarray): | |
return np.concatenate(result_list, axis=0) | |
elif result_list[0] is None: | |
return None | |
elif isinstance(result_list[0], tuple) and len(result_list[0]) > 0: | |
result_tuple = () | |
for i in range(len(result_list[0])): | |
result_tuple += (BatchParallelizeScoop._concatenate_result([x[i] for x in result_list]),) | |
return result_tuple | |
else: | |
raise TypeError("The function wrapped by BatchParallelizeScoop" | |
" should return either ndarrays, None" | |
" or nested tuples of the above two types") | |
@staticmethod | |
def _reshape_results(results, shape): | |
if isinstance(results, np.ndarray): | |
return np.reshape(results, shape) | |
elif results is None: | |
return results | |
elif isinstance(results, tuple) and len(results) > 0: | |
result_tuple = () | |
for res in results: | |
result_tuple += (BatchParallelizeScoop._reshape_results(res, shape),) | |
return result_tuple | |
else: | |
raise TypeError("Invalid type to reshape (required either np.ndarray, or a nested tuple of the same)") | |
def __call__(self, input_array: np.ndarray): | |
""" | |
THe logic is as follows. flatten all but the last dimension of input_array, | |
split this array into `n_scoop_workers` chunks, with each chunk being almost | |
equal in size. Run the function `function` on each of these chunks and | |
reassemble the output to the original batch shape | |
Parameters | |
---------- | |
input_array : np.ndarray | |
The input array that is to be evaluated by the function `function` | |
""" | |
assert input_array.size > 0, \ | |
"The input array to calculate the function on using BatchParallelizeScoop cannot be empty" | |
old_shape = input_array.shape | |
input_array = np.reshape(input_array, (-1, old_shape[-1])) | |
if self.n_scoop_workers == 1: | |
ret_score_array = self.func_partial(input_array) | |
else: | |
input_array_split = almost_equal_split(input_array, self.n_scoop_workers) | |
input_array_split = [x for x in input_array_split if x.size > 0] | |
result_list = list(futures.map(self.func_partial, input_array_split)) | |
ret_score_array = BatchParallelizeScoop._concatenate_result(result_list) | |
return BatchParallelizeScoop._reshape_results(ret_score_array, old_shape[:-1]) | |
class ScoopSharedContext: | |
__shared_cache__ = {} | |
def __init__(self, temp_directory: str = os.environ['CLUSTER_TEMP']): | |
self.temp_directory = temp_directory | |
self.filename_map = {} | |
def set_const(self, name, value): | |
import dill | |
from tempfile import NamedTemporaryFile | |
with NamedTemporaryFile(mode="wb", dir=self.temp_directory, delete=False) as fout: | |
dill.dump(value, fout, protocol=-1) | |
self.filename_map[name] = fout.name | |
def get_const(self, name, cache: bool = False): | |
if name not in self.filename_map: | |
raise KeyError(f"Found no shared variable named '{name}' in this shared_context") | |
import dill | |
cache_key = (self.temp_directory, self.filename_map[name]) | |
get_value_from_file = (not cache) or (cache_key not in ScoopSharedContext.__shared_cache__) | |
if get_value_from_file: | |
with open(self.filename_map[name], 'rb') as fin: | |
value = dill.load(fin) | |
if cache: | |
ScoopSharedContext.__shared_cache__[cache_key] = value | |
else: | |
value = ScoopSharedContext.__shared_cache__[cache_key] | |
return value | |
@classmethod | |
def clear_cache(cls, name): | |
for key in cls.__shared_cache__: | |
del cls.__shared_cache__[key] | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
for v in self.filename_map.values(): | |
os.remove(os.path.join(self.temp_directory, v)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment