Created
February 25, 2019 15:36
-
-
Save RomanSteinberg/b2631ce7863899b4a0145d05b1389659 to your computer and use it in GitHub Desktop.
Benchmark parallel execution
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, Queue, Event | |
from queue import Empty as QueueEmpty | |
from random import randint, seed | |
from time import monotonic as now | |
from datetime import timedelta | |
TASKS_COUNT = 12 | |
ARRAY_SIZE = 2_000_000 | |
POOL_SIZE = 2 | |
def mult(x, y): | |
return [i*j for i,j in zip(x,y)] | |
def gen(sz): | |
seed(101) | |
return [randint(0, 100) for i in range(sz)] | |
def test(): | |
a = gen(ARRAY_SIZE) | |
b = gen(ARRAY_SIZE) | |
c = mult(a, b) | |
d = sum(c) | |
return d | |
class TimeIt(object): | |
def __init__(self, tasks_number=10): | |
self.tasks_number = tasks_number | |
def __call__(self, method): | |
def wrappee( *args, **kw): | |
ts = now() | |
result = method(*args, **kw) | |
ave_time = timedelta(seconds=(now() - ts) / self.tasks_number) | |
print(f'Repeated {self.tasks_number}, average time is {ave_time} sec') | |
return result | |
return wrappee | |
# Sequential | |
def measure_sequential(test_value, tasks_number=10): | |
print(f'Started sequential execution of {tasks_number} tasks') | |
@TimeIt(tasks_number=tasks_number) | |
def measure(): | |
for step in range(tasks_number): | |
res = test() | |
if res != test_value: | |
print(f'Step {step} produced wrong result {res}') | |
measure() | |
# Multiprocessing | |
class Worker: | |
def __init__(self, test_value): | |
self._test_value = test_value | |
self._task = None | |
self._queue = Queue() | |
self._is_free = Event() | |
self._is_free.set() | |
self._stop_received = Event() | |
def run(self): | |
while not self._stop_received.is_set(): | |
if self._task is None: | |
try: | |
self._task = self._queue.get(timeout=1) | |
except QueueEmpty: | |
pass | |
else: | |
res = self._task() | |
if res != self._test_value: | |
print(f'Worker {self} produced wrong result {res}') | |
self._task = None | |
if self._queue.empty(): | |
# очистить и перейти в состояние "free" | |
self._is_free.set() | |
# очистить, перейти в исходное состояние | |
self._stop_received.clear() | |
def register(self, task): | |
self._queue.put(task) | |
self._is_free.clear() | |
def is_free(self): | |
return self._is_free.is_set() | |
def stop(self): | |
self._stop_received.set() | |
def measure_multiprocessing(test_value, tasks_number=10, pool_size=4): | |
@TimeIt(tasks_number=tasks_number) | |
def measure(workers): | |
for step in range(tasks_number): | |
workers[step % pool_size].register(test) | |
all_stoped = False | |
while not all_stoped: | |
all_stoped = True | |
for w in workers: | |
if w.is_free(): | |
w.stop() | |
else: | |
all_stoped = False | |
@TimeIt(tasks_number=1) | |
def init(measure, pool_size, tasks_number, test_value): | |
workers = [Worker(test_value) for i in range(pool_size)] | |
processes = [Process(target=w.run, name='worker') for i, w in enumerate(workers)] | |
processes.append(Process(target=measure, name='jobs', args=[workers])) | |
for p in processes: | |
p.start() | |
return processes | |
print('Started initialization for multiprocessing') | |
started_processes = init(measure, pool_size, tasks_number, test_value) | |
print(f'Started {len(started_processes) - 1} process in multiprocessing mode to execute {tasks_number} ' | |
f'tasks and 1 to register tasks') | |
for i, p in enumerate(started_processes): | |
p.join() | |
# Celery | |
if __name__ == '__main__': | |
master_value = test() | |
print('Master value:', master_value) | |
measure_sequential(master_value, TASKS_COUNT) | |
measure_multiprocessing(master_value, TASKS_COUNT, POOL_SIZE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment