Skip to content

Instantly share code, notes, and snippets.

@satra
Last active December 17, 2015 20:49
Show Gist options
  • Save satra/5670453 to your computer and use it in GitHub Desktop.
Save satra/5670453 to your computer and use it in GitHub Desktop.
Testing multiprocessing with hdf5.
# -*- coding: utf-8 -*-
# <nbformat>3.0</nbformat>
# <codecell>
from glob import glob
import os
import numpy as np
# <codecell>
N = 5000 # number of rows and columns of data matrix
N_samples = 2 # number of samples
# <codecell>
import h5py as h5
import multiprocessing as mp
from functools import partial
print h5.version.version
print h5.version.api_version
# <codecell>
data_ids = range(N_samples)
template = 'S%s.h5'
for idx, data_id in enumerate(data_ids):
outfile = template % data_id
if os.path.exists(outfile):
continue
mat = np.random.rand(N, N)
f = h5.File(outfile, 'w')
f.create_dataset('corrmat', data=mat, compression=5)
f.close()
# <codecell>
fl = {}
for idx, data_id in enumerate(data_ids):
fl[data_id] = h5.File(template % data_id, 'r')
# <codecell>
def correlate_row(row_index, h5fl, ids, N):
row_mat = np.zeros((len(ids), N))
for idx, data_id in enumerate(ids):
row_mat[idx, :] = h5fl[data_id]['corrmat'][row_index, :]
return row_mat.mean(axis=0)
partial_func = partial(correlate_row, h5fl=fl, ids=data_ids, N=N)
# <markdowncell>
# Non multiprocessing result
# <codecell>
result = map(partial_func, (x for x in xrange(1)))
# <markdowncell>
# Multiprocessing result
# <codecell>
pool = mp.Pool(2) #(mp.cpu_count())
result = pool.map(partial_func, (x for x in xrange(1)))
pool.close()
pool.join()
# <codecell>
for _, fh in fl.items():
fh.close()
# <codecell>
$ python hdf5_test.py
2.0.0
1.8
Process PoolWorker-1:
Traceback (most recent call last):
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/pool.py", line 85, in worker
task = get()
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
File "_objects.pyx", line 204, in h5py._objects.ObjectID.__cinit__ (h5py/_objects.c:2683)
TypeError: __cinit__() takes exactly 1 positional argument (0 given)
^C
Process PoolWorker-3:
Process PoolWorker-2:
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/pool.py", line 85, in worker
self.run()
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/pool.py", line 85, in worker
task = get()
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/queues.py", line 376, in get
task = get()
File "/Library/Frameworks/EPD64.framework/Versions/7.3/lib/python2.7/multiprocessing/queues.py", line 374, in get
return recv()
KeyboardInterrupt
racquire()
KeyboardInterrupt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment