-
-
Save honzakral/699f74283298a49c0b3df7f0bd4cbda4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This is a simple benchmark that tests the performance of several | |
locking implementations. Each example implements a connection | |
pool that provides `get_connection()` to retrieve a connection | |
from the pool and `release()` to return a connection back to the | |
pool. | |
The `test()` function creates an instance of `pool_class` and | |
creates `num_threads` `threading.Thread` instances that simply call | |
`pool.get_connection()` and `pool.release()` repeatedly until | |
`num_seconds` has elapsed. | |
ListLockPool uses a `threading.Lock` to protect the critical | |
sections of `get_connection()` and `release()`. | |
ListConditionPool uses two `threading.Condition` objects, each using | |
the same `threading.Lock` to protect the critical sections of | |
`get_connections()` and `release()`. | |
ListConditionNotifyPool is identical to ListConditionPool except that | |
it also calls the relevant conditions `notify()` at the end of the | |
each critical section. This mimics `queue.Queue`'s implementation. | |
QueuePool uses the standard library's `queue.LifoQueue`. | |
Benchmarks: | |
All timings were performed on CPython 3.8.0 on OSX. | |
Using 10 threads for 10 seconds, each implementation roughly | |
performs at: | |
ListLockPool: 1x | |
ListConditionPool: 1.1x (always slightly faster than ListLockPool) | |
ListConditionNotifyPool: 4x | |
QueuePool: 3x | |
The ListConditionNotifyPool and QueuePool share a very similar | |
implementation. Python's LifoQueue class has more features | |
than what ListConditionNotifyPool needs and likely those extra bits | |
are the reason why QueuePool is slower than the stripped down | |
ListConditionNotifyPool. | |
However I do not understand why there is any difference between | |
the three List*Pool implementations. From looking at the | |
`threading.Condition` source code[0], the `threading.Condition.__enter__` | |
method simply calls its lock's `__enter__`. Therefore it seems to | |
me that ListLockPool and ListConditionPool should be more or less | |
identical. But from my observation ListConditionPool is always slightly | |
faster. | |
ListConditionNotifyPool is even more peculiar. The only difference | |
between ListCondtionPool and ListConditionNotifyPool is that the latter | |
calls `condition_object.notify()` at the end of each critiical section. | |
However there are no calls to `condition_object.wait()` and therefore | |
no "waiters" to wake up when calling `.notify()`. I don't understand | |
why simply calling `condition_object.notify()` provides a ~4x performance | |
improvement over ListConditionPool. | |
[0] https://github.com/python/cpython/blob/master/Lib/threading.py#L247 | |
""" | |
import threading | |
import time | |
from queue import LifoQueue, Empty, Full | |
class ListLockPool: | |
def __init__(self, max_connections): | |
self.max_connections = max_connections | |
self.created_connections = 0 | |
self.connections = [] | |
self.lock = threading.Lock() | |
def get_connection(self): | |
with self.lock: | |
try: | |
connection = self.connections.pop() | |
except IndexError: | |
connection = self.make_connection() | |
return connection | |
def make_connection(self): | |
if self.created_connections >= self.max_connections: | |
raise Exception('Too Many Connections') | |
self.created_connections += 1 | |
return object() | |
def release(self, connection): | |
with self.lock: | |
self.connections.append(connection) | |
def count(self): | |
return len(self.connections) | |
class ListConditionPool: | |
def __init__(self, max_connections): | |
self.max_connections = max_connections | |
self.created_connections = 0 | |
self.connections = [] | |
self.lock = threading.Lock() | |
self.not_empty = threading.Condition(self.lock) | |
self.not_full = threading.Condition(self.lock) | |
def get_connection(self): | |
with self.not_empty: | |
try: | |
connection = self.connections.pop() | |
except IndexError: | |
connection = self.make_connection() | |
return connection | |
def make_connection(self): | |
if self.created_connections >= self.max_connections: | |
raise Exception('Too Many Connections') | |
self.created_connections += 1 | |
return object() | |
def release(self, connection): | |
with self.not_full: | |
self.connections.append(connection) | |
def count(self): | |
return len(self.connections) | |
class ListConditionNotifyPool: | |
def __init__(self, max_connections): | |
self.max_connections = max_connections | |
self.created_connections = 0 | |
self.connections = [] | |
self.lock = threading.Lock() | |
self.not_empty = threading.Condition(self.lock) | |
self.not_full = threading.Condition(self.lock) | |
def get_connection(self): | |
with self.not_empty: | |
try: | |
connection = self.connections.pop() | |
except IndexError: | |
connection = self.make_connection() | |
self.not_full.notify() | |
return connection | |
def make_connection(self): | |
if self.created_connections >= self.max_connections: | |
raise Exception('Too Many Connections') | |
self.created_connections += 1 | |
return object() | |
def release(self, connection): | |
with self.not_full: | |
self.connections.append(connection) | |
self.not_empty.notify() | |
def count(self): | |
return len(self.connections) | |
class QueuePool: | |
def __init__(self, max_connections): | |
self.connections = LifoQueue(max_connections) | |
self.created_connections = 0 | |
while True: | |
try: | |
self.connections.put(None, block=False) | |
except Full: | |
break | |
def get_connection(self): | |
connection = None | |
try: | |
connection = self.connections.get(block=False) | |
except Empty: | |
raise Exception('No Connection Available') | |
if connection is None: | |
connection = self.make_connection() | |
return connection | |
def make_connection(self): | |
self.created_connections += 1 | |
return object() | |
def release(self, connection): | |
self.connections.put(connection, block=False) | |
def count(self): | |
return self.connections.qsize() | |
def worker(pool, num_reqs, results): | |
start = time.monotonic_ns() | |
for _ in range(num_reqs): | |
connection = pool.get_connection() | |
pool.release(connection) | |
results.append(time.monotonic_ns() - start) | |
def test(pool_class, num_threads, num_reqs): | |
threads = [] | |
pool = pool_class(num_threads) | |
results = [] | |
for i in range(num_threads): | |
threads.append( | |
threading.Thread(target=worker, args=(pool, num_reqs, results)) | |
) | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
print(f'{pool_class.__name__}: ' | |
f'{sum(results):,} ns, ') | |
test(ListLockPool, 10, 100_000) | |
test(ListConditionPool, 10, 100_000) | |
test(ListConditionNotifyPool, 10, 100_000) | |
test(QueuePool, 10, 100_000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment