Last active
October 13, 2019 09:17
-
-
Save ogrisel/a2b0e5ae4987a398caa7f9277cb3b90a to your computer and use it in GitHub Desktop.
Draft use of pickle protocol 5 (PEP 574) for zero-copy numpy array pickling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pickle import Pickler, load | |
try: | |
from pickle import PickleBuffer | |
except ImportError: | |
PickleBuffer = None | |
import copyreg | |
import os | |
import numpy as np | |
import time | |
import gc | |
import io | |
from multiprocessing import get_context | |
def monitor_worker(pid, queue, stop_event, delay=0.05): | |
from psutil import Process | |
p = Process(pid) | |
peak = 0 | |
def make_measurement(peak): | |
mem = p.memory_info().rss | |
if mem > peak: | |
peak = mem | |
return peak | |
# Make measurements every 'delay' seconds until we receive the stop event: | |
while not stop_event.wait(timeout=delay): | |
peak = make_measurement(peak) | |
# Make one last measurement in case memory has increased just before | |
# receiving the stop event: | |
peak = make_measurement(peak) | |
queue.put(peak) | |
class PeakMemoryMonitor: | |
_mp = get_context('spawn') | |
def __enter__(self): | |
pid = os.getpid() | |
self.queue = q = self._mp.Queue() | |
self.stop_event = e = self._mp.Event() | |
self.worker = self._mp.Process(target=monitor_worker, args=(pid, q, e)) | |
self.worker.start() | |
return self | |
def __exit__(self, exc_type, exc_value, tb): | |
self.stop_event.set() | |
if exc_type is not None: | |
self.worker.terminate() | |
return False | |
else: | |
self.peak = self.queue.get() | |
print("peak memory usage: {:.3f} GB".format(self.peak / 1e9)) | |
return True | |
def _array_from_buffer(buffer, dtype, shape): | |
return np.frombuffer(buffer, dtype=dtype).reshape(shape) | |
def reduce_ndarray_pickle5(a): | |
# This reducer assumes protocol 5 as currently there is no way to register | |
# protocol-aware reduce function in the global copyreg dispatch table. | |
if not a.dtype.hasobject and a.flags.c_contiguous: | |
# No-copy pickling for C-contiguous arrays and protocol 5 | |
return _array_from_buffer, (PickleBuffer(a), a.dtype, a.shape), None | |
else: | |
# Fall-back to generic method | |
return a.__reduce__() | |
if __name__ == "__main__": | |
print("# Part 1: in-memory dump speed\n") | |
# See: https://github.com/numpy/numpy/issues/7544 | |
for protocol in (4, 5): | |
if PickleBuffer is None and protocol == 5: | |
continue | |
data = np.random.randint(0, 255, dtype='u1', size=1000000000) | |
t0 = time.time() | |
p = Pickler(io.BytesIO(), protocol=protocol) | |
if protocol >= 5: | |
p.dispatch_table = copyreg.dispatch_table.copy() | |
p.dispatch_table[np.ndarray] = reduce_ndarray_pickle5 | |
p.dump(data) | |
print(f"protocol {protocol} in-memory dump of 1GB in" | |
f" {time.time() - t0:0.3}s") | |
del p | |
del data | |
gc.collect() | |
print("\n# Part 2: dumping and loading to / from files\n") | |
for protocol in (4, 5): | |
if PickleBuffer is None and protocol == 5: | |
continue | |
filename = f'blob_protocol_{protocol}.bin' | |
print('Allocating original array...') | |
with PeakMemoryMonitor(): | |
data = np.ones(int(1e9 / 8)) | |
print(f'Dumping array to {filename}...') | |
with PeakMemoryMonitor(): | |
t0 = time.time() | |
with open(filename, 'wb') as f: | |
p = Pickler(f, protocol=protocol) | |
if protocol >= 5: | |
p.dispatch_table = copyreg.dispatch_table.copy() | |
p.dispatch_table[np.ndarray] = reduce_ndarray_pickle5 | |
p.dump(data) | |
del p | |
print(f'done in {time.time() - t0:0.3}s') | |
del data | |
gc.collect() | |
print(f'Loading array back from {filename}...') | |
with PeakMemoryMonitor(): | |
t0 = time.time() | |
with open(filename, 'rb') as f: | |
data = load(f) | |
print(f'done in {time.time() - t0:0.3}s') | |
del data | |
gc.collect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Results using the pickle5 backport for Python 3.6:
Basically, it's possible to get zero-copy pickling of contiguous numpy arrays.