Instantly share code, notes, and snippets.
Last active
June 24, 2019 06:28
-
Star
0
(0)
You must be signed in to star a gist -
Fork
0
(0)
You must be signed in to fork a gist
-
Save austospumanto/63d8e3f3b75626999496977806c712a6 to your computer and use it in GitHub Desktop.
SharedMemoryManager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
System/Runtime Requirements: | |
Python3.7 | |
Linux / Mac | |
Must pip install: | |
numpy, pandas | |
Must build from source: | |
shared_memory (https://github.com/SleepProgger/py_shared_memory) | |
NOTE: Make sure to comment out the libraries=["rt"] extensions keyword argument | |
in setup.py to be able to build on Mac | |
Must pip install to run the tests: | |
seaborn | |
""" | |
from __future__ import annotations | |
import dataclasses as dc | |
import json | |
import multiprocessing as mp | |
import time | |
from typing import Tuple, Optional, List | |
import numpy as np | |
import pandas as pd | |
import shared_memory as sm | |
NamesT = Optional[List[str]] | |
class GlobalArrayMetadata: | |
shmname2shape = dict() | |
shmname2dtype = dict() | |
shmname2names = dict() | |
@classmethod | |
def set_shape(cls, shmname: str, shape: Tuple[int, ...]) -> None: | |
cls.shmname2shape[shmname] = shape | |
@classmethod | |
def set_dtype(cls, shmname: str, dtype: np.dtype) -> None: | |
cls.shmname2dtype[shmname] = dtype | |
@classmethod | |
def set_names(cls, shmname: str, names: NamesT) -> None: | |
cls.shmname2names[shmname] = names | |
@classmethod | |
def get_shape(cls, shmname: str) -> Tuple[int, ...]: | |
return cls.shmname2shape[shmname] | |
@classmethod | |
def get_dtype(cls, shmname: str) -> np.dtype: | |
return cls.shmname2dtype[shmname] | |
@classmethod | |
def get_names(cls, shmname: str) -> NamesT: | |
return cls.shmname2names[shmname] | |
@dc.dataclass(frozen=True) | |
class SharedDFArrNames: | |
columns: str | |
index: str | |
values: List[str] | |
dtypes: str | |
def ensure_vanilla_forking() -> None: | |
method = mp.get_start_method(allow_none=True) | |
if method: | |
assert method == "fork", method | |
else: | |
print('Executing `multiprocessing.set_start_method("fork")`') | |
mp.set_start_method("fork") | |
class SharedMemoryManager: | |
def __init__(self, is_child: bool = False): | |
self.is_child = is_child | |
self.shmname2shm = dict() | |
# So the class attrs of `GlobalArrayMetadata` are readable in child procs | |
ensure_vanilla_forking() | |
def __enter__(self) -> SharedMemoryManager: | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.shutdown() | |
def shutdown(self): | |
for shm in self.shmname2shm.values(): | |
shm.close() | |
if not self.is_child: | |
shm.unlink() | |
def SharedMemory(self, name=None, create=False, size=0) -> sm.SharedMemory: | |
shm = sm.SharedMemory(name=name, create=create, size=size) | |
self.shmname2shm[shm.name] = shm | |
return shm | |
@classmethod | |
def set_shape(cls, name: str, shape: Tuple[int, ...]) -> None: | |
GlobalArrayMetadata.set_shape(name, shape) | |
@classmethod | |
def set_dtype(cls, name: str, dtype: np.dtype) -> None: | |
GlobalArrayMetadata.set_dtype(name, dtype) | |
@classmethod | |
def set_names(cls, shmname: str, names: NamesT) -> None: | |
GlobalArrayMetadata.set_names(shmname, names) | |
@classmethod | |
def get_shape(cls, name: str) -> Tuple[int, ...]: | |
return GlobalArrayMetadata.get_shape(name) | |
@classmethod | |
def get_dtype(cls, name: str) -> np.dtype: | |
return GlobalArrayMetadata.get_dtype(name) | |
@classmethod | |
def get_names(cls, shmname: str) -> NamesT: | |
return GlobalArrayMetadata.get_names(shmname) | |
def get_shared_array_names(self, shmname: str) -> NamesT: | |
return self.get_names(shmname) | |
def get_shared_ndarray(self, shmname: str, shape=None, dtype=None) -> np.ndarray: | |
existing_shm = self.SharedMemory(name=shmname, create=False) | |
return np.ndarray( | |
shape or self.get_shape(shmname), | |
dtype=dtype or self.get_dtype(shmname), | |
buffer=existing_shm.buf, | |
) | |
def get_shared_dataframe(self, shm_names: SharedDFArrNames) -> pd.DataFrame: | |
columns_arr = self.get_shared_ndarray(shm_names.columns) | |
index_arr = self.get_shared_ndarray(shm_names.index) | |
values_arrs = [self.get_shared_ndarray(shmname) for shmname in shm_names.values] | |
dtypes_arr = self.get_shared_ndarray(shm_names.dtypes) | |
if len(set(dtypes_arr)) == 1: | |
# The shared dataframe's values are editable in-place. | |
# Update the underlying shared np.ndarray through `df.iloc[.., ..] = ..` | |
df = pd.DataFrame( | |
values_arrs[0], | |
columns=columns_arr, | |
index=index_arr, | |
copy=False, | |
dtype=dtypes_arr[0], | |
) | |
else: | |
values = dict(zip(columns_arr, values_arrs)) | |
df = pd.DataFrame(values, columns=columns_arr, index=index_arr) | |
df = df.astype(dict(zip(columns_arr, dtypes_arr)), copy=False) | |
df.columns.names = self.get_shared_array_names(shm_names.columns) | |
df.index.names = self.get_shared_array_names(shm_names.index) | |
return df | |
def share_ndarray(self, arr: np.ndarray, names: NamesT = None) -> str: | |
new_shm = self.SharedMemory(create=True, size=arr.nbytes) | |
filler = np.ndarray(arr.shape, dtype=arr.dtype, buffer=new_shm.buf) | |
filler[:] = arr[:] | |
self.set_shape(new_shm.name, arr.shape) | |
self.set_dtype(new_shm.name, arr.dtype) | |
self.set_names(new_shm.name, names) | |
return new_shm.name | |
def share_dataframe(self, df: pd.DataFrame) -> SharedDFArrNames: | |
columns_arr = df.columns.to_numpy() | |
index_arr = df.index.to_numpy() | |
dtypes_arr = df.dtypes.to_numpy() | |
columns_name = self.share_ndarray(columns_arr, names=df.columns.names) | |
index_name = self.share_ndarray(index_arr, names=df.index.names) | |
dtypes_name = self.share_ndarray(dtypes_arr) | |
if len(set(dtypes_arr)) == 1: | |
# The shared dataframe's values will be editable in-place. | |
# Update the underlying shared np.ndarray through `df.iloc[.., ..] = ..` | |
values_names = [self.share_ndarray(df.values)] | |
else: | |
values_names = [ | |
self.share_ndarray(df.iloc[:, j].to_numpy()) for j in range(df.shape[1]) | |
] | |
return SharedDFArrNames( | |
columns=columns_name, | |
index=index_name, | |
values=values_names, | |
dtypes=dtypes_name, | |
) | |
class Tests: | |
@staticmethod | |
def p_handle_shared_array(name: str) -> None: | |
with SharedMemoryManager(is_child=True) as smm: | |
names = smm.get_shared_array_names(name) | |
arr = smm.get_shared_ndarray(name) | |
print(names) | |
print(arr) | |
assert names is None | |
np.testing.assert_array_equal(arr, Tests.test__share_array.arr) | |
@staticmethod | |
def test__share_array() -> None: | |
arr = np.array([[1, 2, 3], [4, 5, 6]], dtype=np.int32) | |
with SharedMemoryManager() as smm: | |
Tests.test__share_array.arr = arr | |
p = mp.Process( | |
target=Tests.p_handle_shared_array, | |
kwargs=dict(name=smm.share_ndarray(arr)), | |
) | |
p.start() | |
p.join() | |
p.close() | |
@staticmethod | |
def p_handle_shared_dataframe(names: SharedDFArrNames) -> None: | |
with SharedMemoryManager(is_child=True) as smm: | |
df: pd.DataFrame = smm.get_shared_dataframe(names) | |
print(df.head()) | |
pd.testing.assert_frame_equal(df, Tests._test__share_dataframe.df) | |
@staticmethod | |
def test__share_dataframe() -> None: | |
import seaborn as sns | |
Tests._test__share_dataframe(sns.load_dataset("titanic")) | |
Tests._test__share_dataframe(sns.load_dataset("titanic").set_index("sex")) | |
Tests._test__share_dataframe(sns.load_dataset("iris")) | |
Tests._test__share_dataframe(sns.load_dataset("iris").set_index("species")) | |
@staticmethod | |
def test__sub1_dataframe() -> None: | |
import seaborn as sns | |
Tests._test__sub1_dataframe(sns.load_dataset("titanic").select_dtypes(float)) | |
Tests._test__sub1_dataframe(sns.load_dataset("iris").select_dtypes(float)) | |
@staticmethod | |
def _test__share_dataframe(df: pd.DataFrame) -> None: | |
with SharedMemoryManager() as smm: | |
Tests._test__share_dataframe.df = df | |
p = mp.Process( | |
target=Tests.p_handle_shared_dataframe, | |
kwargs=dict(names=smm.share_dataframe(df)), | |
) | |
p.start() | |
p.join() | |
p.close() | |
@staticmethod | |
def p_sub1_shared_dataframe(names: SharedDFArrNames) -> None: | |
with SharedMemoryManager(is_child=True) as smm: | |
df: pd.DataFrame = smm.get_shared_dataframe(names) | |
print(df.head()) | |
print(df.values.dtype) | |
df.iloc[:, :] = df.iloc[:, :] - 1 | |
pd.testing.assert_frame_equal( | |
df, Tests._test__sub1_dataframe.df - 1, check_less_precise=True | |
) | |
@staticmethod | |
def _test__sub1_dataframe(df: pd.DataFrame) -> None: | |
assert len(set(df.dtypes.to_numpy())) == 1 | |
with SharedMemoryManager() as smm: | |
Tests._test__sub1_dataframe.df = df | |
df_names = smm.share_dataframe(df) | |
p = mp.Process( | |
target=Tests.p_sub1_shared_dataframe, kwargs=dict(names=df_names) | |
) | |
p.start() | |
p.join() | |
p.close() | |
pd.testing.assert_frame_equal( | |
smm.get_shared_dataframe(df_names), df - 1, check_less_precise=True | |
) | |
@staticmethod | |
def test_store_result_chunked(): | |
# Must have my `processit` module to run this test. | |
# Download from: https://gist.github.com/austospumanto/6205276f84cd4dde38f3ce17dddccdb3 | |
from .processit import processit | |
njobs = 6 | |
size = int(1e8) | |
ncols = 10000 | |
shape = (size // ncols, ncols) | |
df_input = pd.DataFrame(np.arange(0, size, dtype=np.float64).reshape(shape)) | |
########## | |
# NORMAL # | |
########## | |
expected_time = time.time() | |
expected = df_input.skew(axis=1).to_numpy() | |
expected_time = time.time() - expected_time | |
############ | |
# PARALLEL # | |
############ | |
parallel_time = time.time() | |
parallel_actual = np.array( | |
processit( | |
[ | |
dict(target=Tests.p_skew_of_ith_row, args=(i, df_input)) | |
for i in df_input.index | |
], | |
max_nprocs=njobs, | |
), | |
dtype=np.float64, | |
) | |
parallel_time = time.time() - parallel_time | |
################### | |
# PARALLEL SHARED # | |
################### | |
parallel_shared_time = time.time() | |
arr_output = np.empty(df_input.shape[0], dtype=np.float64) | |
with SharedMemoryManager() as smm: | |
df_input_names = smm.share_dataframe(df_input) | |
arr_output_name = smm.share_ndarray(arr_output) | |
Tests.p_skew_of_ith_row_shared.df_input_names = df_input_names | |
Tests.p_skew_of_ith_row_shared.arr_output_name = arr_output_name | |
processit( | |
[ | |
dict(target=Tests.p_skew_of_ith_row_shared, args=(i,)) | |
for i in df_input.index | |
], | |
max_nprocs=njobs, | |
) | |
parallel_shared_actual = smm.get_shared_ndarray(arr_output_name).copy() | |
parallel_shared_time = time.time() - parallel_shared_time | |
########################### | |
# PARALLEL SHARED CHUNKED # | |
########################### | |
parallel_shared_chunked_time = time.time() | |
with SharedMemoryManager() as smm: | |
df_input_names = smm.share_dataframe(df_input) | |
Tests.p_skew_chunked.df_input_names = df_input_names | |
map_args = np.array_split( | |
ary=df_input.index.to_numpy(), | |
indices_or_sections=(len(df_input) // njobs) + 1, | |
) | |
arr_names = mp.Pool(njobs).map(Tests.p_skew_chunked, map_args, chunksize=1) | |
parallel_shared_chunked_actual = np.empty( | |
shape=(df_input.shape[0],), dtype=np.float64 | |
) | |
np.concatenate( | |
[ | |
smm.get_shared_ndarray( | |
name, shape=(map_args[i].size,), dtype=np.float64 | |
) | |
for i, name in enumerate(arr_names) | |
], | |
out=parallel_shared_chunked_actual, | |
) | |
parallel_shared_chunked_time = time.time() - parallel_shared_chunked_time | |
times = dict( | |
expected_time=expected_time, | |
parallel_time=parallel_time, | |
parallel_shared_time=parallel_shared_time, | |
parallel_shared_chunked_time=parallel_shared_chunked_time, | |
) | |
print(json.dumps(times, indent=4, sort_keys=True)) | |
np.testing.assert_array_almost_equal(expected, parallel_actual) | |
np.testing.assert_array_almost_equal(expected, parallel_shared_actual) | |
np.testing.assert_array_almost_equal(expected, parallel_shared_chunked_actual) | |
assert ( | |
parallel_time | |
< parallel_shared_chunked_time | |
< expected_time | |
< parallel_shared_time | |
) | |
@staticmethod | |
def p_skew_of_ith_row_shared(i: int) -> None: | |
df_input_names: SharedDFArrNames = Tests.p_skew_of_ith_row_shared.df_input_names | |
arr_output_name: str = Tests.p_skew_of_ith_row_shared.arr_output_name | |
with SharedMemoryManager(is_child=True) as smm: | |
df_in: pd.DataFrame = smm.get_shared_dataframe(shm_names=df_input_names) | |
arr_out: np.ndarray = smm.get_shared_ndarray(shmname=arr_output_name) | |
arr_out[i] = df_in.iloc[i, :].skew() | |
@staticmethod | |
def p_skew_of_ith_row(i: int, df_input: pd.DataFrame) -> np.float64: | |
return df_input.iloc[i, :].skew() | |
@staticmethod | |
def p_skew_chunked(ilocs: np.ndarray) -> str: | |
df_input_names: SharedDFArrNames = Tests.p_skew_chunked.df_input_names | |
with SharedMemoryManager(is_child=True) as smm: | |
df_in: pd.DataFrame = smm.get_shared_dataframe(shm_names=df_input_names) | |
arr_out: np.ndarray = df_in.iloc[ilocs, :].skew(axis=1).to_numpy() | |
return smm.share_ndarray(arr=arr_out) | |
if __name__ == "__main__": | |
Tests.test_store_result_chunked() | |
Tests.test__share_array() | |
Tests.test__share_dataframe() | |
Tests.test__sub1_dataframe() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
py_shared_memory
Make sure to comment out the
libraries=["rt"]
extensions keyword argument insetup.py
to be able to build on Mac