|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
|
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
|
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, |
|
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
|
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
''' |
|
Demo for accessing a read-only shared array for high-performance |
|
numpy workloads. |
|
|
|
:copyright: (c) 2016 Alex Huszagh |
|
:license: MIT |
|
''' |
|
|
|
import ctypes |
|
import multiprocessing |
|
import numpy as np |
|
|
|
DOUBLE_P = ctypes.POINTER(ctypes.c_double) |
|
|
|
|
|
def target(address, shape): |
|
'''Reconstruct array in target''' |
|
|
|
pointer = ctypes.cast(address, DOUBLE_P) |
|
# create new array from address, shared memory. Changes written |
|
# here are not reflected in the original array with true multiprocessing |
|
# with threading, they are |
|
new = np.ctypeslib.as_array(pointer, shape=shape) |
|
# demonstrate new address is same as old |
|
# be careful though, any intermediates, such as (new > x), |
|
# or new + 1 will create new data structures and potentiall overload |
|
# your system |
|
print(new.ctypes.data, 'Target Address') |
|
|
|
|
|
def main(): |
|
'''Construct an initial template array and pass by ref''' |
|
|
|
shape = (30000, 30000) |
|
# this takes up 7.2 Gb of memory |
|
array = np.random.random(shape) |
|
address = array.ctypes.data |
|
print(address, 'Original Address') |
|
|
|
jobs = [] |
|
# this would kill anything without 2.1 Tb of virtual memory |
|
for _ in range(10): |
|
p = multiprocessing.Process(target=target, args=(address, shape)) |
|
jobs.append(p) |
|
p.start() |
|
|
|
for job in jobs: |
|
job.join() |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |