Skip to content

Instantly share code, notes, and snippets.

@martindurant
Last active July 8, 2023 14:55
Show Gist options
  • Save martindurant/5f517ec55a5bff9c32637e8ebc57ef7c to your computer and use it in GitHub Desktop.
Save martindurant/5f517ec55a5bff9c32637e8ebc57ef7c to your computer and use it in GitHub Desktop.
import mmap
import tempfile
import weakref
import uuid
import SharedArray as sa
import numpy as np
"""https://gitlab.com/tenzing/shared-array"""
def from_shm(name):
return sa.attach(name)
def try_remove(u):
try:
sa.delete(u)
except FileNotFoundError:
# was already removed
pass
class SharedWrappedArray:
def __init__(self, data=None, u=None):
if data is None and u is None:
raise ValueError("Must supply data or shm handle")
self.data = data
self.u = u
self.m = sa.attach(u) if u else None
def __getattr__(self, item):
if item in ["__array__", "__array_function__", "__array_interface__",
"__array_ufunc__"]:
if self.data is not None:
return getattr(self.data, item)
return getattr(self.m, item)
raise AttributeError(item)
def __dask_tokenize__(self):
return f"shared_np_{self._to_shm()}"
def _to_shm(self):
if self.m is None:
u = uuid.uuid4().hex[:8]
# copy - shared version is read-write, but does not change original
self.m = sa.create(u, self.data.shape, self.data.dtype)
self.m[:] = self.data
weakref.finalize(self.m, lambda: try_remove(u))
self.u = u
return self.u
def __reduce__(self):
self._to_shm()
return from_shm, (self.u, )
def remake(fn, dtype, shape):
with open(fn, 'r+b') as f:
m = mmap.mmap(f.fileno(), 0)
return np.frombuffer(m, dtype=dtype).reshape(shape)
class SharedMMapArray:
def __init__(self, data=None):
if data is None and u is None:
raise ValueError("Must supply data or shm handle")
self.data = data
self.fn = None
def __dask_tokenize__(self):
return f"shared_np_{self._to_shm()}"
def _to_shm(self):
if self.fn is None:
self.fn = tempfile.mktemp()
with open(self.fn, "wb") as f:
self.data.tofile(f)
def __reduce__(self):
self._to_shm()
return remake, (self.fn, self.data.dtype, self.data.shape)
def simple_bench():
import dask.distributed
import numpy
import time
x = np.ones((1000, 100000))
x2 = SharedWrappedArray(x)
x3 = SharedMMapArray(x)
with dask.distributed.Client(n_workers=1) as client:
t9 = time.time()
assert client.submit(lambda x: x.sum(), x3).result() == x.sum()
t8 = time.time()
assert client.submit(lambda x: x.sum(), x3).result() == x.sum()
t0 = time.time()
assert client.submit(lambda x: x.sum(), x).result() == x.sum()
t1 = time.time()
assert client.submit(lambda x: x.sum(), x2).result() == x.sum()
t2 = time.time()
assert client.submit(lambda x: x.sum(), x2).result() == x.sum()
t3 = time.time()
f = client.scatter(x)
assert client.submit(lambda x: x.sum(), f).result() == x.sum()
t4 = time.time()
print("mmap 1:", t8 - t9)
print("mmap2:", t0 - t8)
print("numpy time:", t1 - t0)
print("shared time, first:", t2 - t1)
print("shared time, second:", t3 - t2)
print("scatter time:", t4 - t3)
@martindurant
Copy link
Author

Times:

mmap 1: 1.8584792613983154
mmap 2: 0.37310290336608887
numpy time: 7.0672080516815186
shared time, first: 0.8365190029144287
shared time, second: 0.40976500511169434
scatter time: 2.3145029544830322

So the writing of the file for mmap takes more time than writing to shm (shows up only on first call), but when being read, it is equivalent.

@alexis-intellegens
Copy link

On dask this would duplicate memory irregardless of the mmap right

@martindurant
Copy link
Author

No, you should get a pointer to the same OS-cached block in memory.

@alexis-intellegens
Copy link

alexis-intellegens commented Apr 4, 2021

Strange,

I create a 4gb array copying your code and get the following:

distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting

I'm using WSL2 on Win10, could that be the issue? The Ubuntu instance tells me it's using 4gb, but task manager on windows tells me the linux instance is using 8gb (I have 16gb)

@alexis-intellegens
Copy link

import dask.distributed
from dask.distributed import Client
import numpy
import time
x = np.random.random(size=[2_000_000, 200])
x2 = SharedWrappedArray(x)
#     x3 = SharedMMapArray(x)

client = Client(processes=True, threads_per_worker=2,
            n_workers=4)

models = []
for i in range(8):
    models.append(client.submit(lambda x: x.sum(), x2))

client.gather(models)

@alexis-intellegens
Copy link

How much memory should this consume? The array is around 4gb, if it wasn't duplicated my workers wouldn't crash right.

@martindurant
Copy link
Author

martindurant commented Apr 4, 2021 via email

@martindurant
Copy link
Author

If Dask is asking the workers for the number of bytes contained in a variable, like sys.getsizeof, then the array will appear to contribute to the memory footprint of each process, even if it isn't actually duplicated. I'm not sure how psutil's memory_info, which is what the nanny watches, handles this. I'm not certain how to tell the difference. Since this would be a new way to do things, it's worth finding out. Something similar must happen when multiple dask processes use one GPU.

@alexis-intellegens
Copy link

I see what you're saying. That could be happening. Are you able to reproduce the out of memory errors with code similar to mine?

@martindurant
Copy link
Author

I haven't yet tried, sorry

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment