Created
May 9, 2019 13:57
-
-
Save cburgdorf/ecee18992b88b16e417fd73d3edef332 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import multiprocessing | |
from multiprocessing.managers import ( | |
BaseManager, | |
) | |
import tempfile | |
import time | |
import pytest | |
from eth.chains.ropsten import ROPSTEN_GENESIS_HEADER | |
from eth.db.backends.level import LevelDB | |
from eth.db.atomic import ( | |
AtomicDB, | |
) | |
from eth.db.chain import ( | |
ChainDB, | |
) | |
from trinity.db.eth1.manager import ( | |
create_db_server_manager, | |
create_db_consumer_manager, | |
) | |
from trinity.config import ( | |
TrinityConfig, | |
) | |
from trinity.initialization import ( | |
initialize_data_dir, | |
) | |
from trinity.constants import ROPSTEN_NETWORK_ID | |
from trinity.db.rocksdb import RocksDB | |
from trinity.db.eth1.chain import AsyncChainDBProxy | |
from trinity.db.base import AsyncDBProxy | |
from trinity._utils.ipc import ( | |
wait_for_ipc, | |
kill_process_gracefully, | |
) | |
def serve_db_process(dbtype): | |
def serve_chaindb(manager): | |
server = manager.get_server() | |
server.serve_forever() | |
with tempfile.TemporaryDirectory() as temp_dir: | |
trinity_config = TrinityConfig( | |
network_id=ROPSTEN_NETWORK_ID, | |
trinity_root_dir=temp_dir, | |
) | |
db_path = temp_dir + '/db' | |
core_db = dbtype(db_path) | |
core_db[b'key-a'] = b'value-a' | |
initialize_data_dir(trinity_config) | |
manager = create_db_server_manager(trinity_config, core_db) | |
chaindb_server_process = multiprocessing.Process( | |
target=serve_chaindb, | |
args=(manager,), | |
) | |
chaindb_server_process.start() | |
wait_for_ipc(trinity_config.database_ipc_path) | |
try: | |
yield trinity_config.database_ipc_path, db_path | |
finally: | |
kill_process_gracefully(chaindb_server_process, logging.getLogger()) | |
NUM_READS = 10000 | |
NUM_READER_PROCESSES = 3 | |
def make_reads(db): | |
for _ in range(NUM_READS): | |
assert b'key-a' in db | |
assert db[b'key-a'] == b'value-a' | |
with pytest.raises(KeyError): | |
db[b'not-present'] | |
def launch_leveldb_reader_process(path): | |
manager = create_db_consumer_manager(path, connect=True) | |
db = manager.get_db() | |
make_reads(db) | |
def launch_rocksdb_reader_process(path): | |
db = RocksDB(path, read_only=True) | |
make_reads(db) | |
def run_benchmark(db_type, launcher): | |
for (ipc_path, db_path) in serve_db_process(db_type): | |
procs = [] | |
started_at = time.perf_counter() | |
for _ in range(NUM_READER_PROCESSES): | |
proc = launcher(ipc_path, db_path) | |
proc.start() | |
procs.append(proc) | |
for proc in procs: | |
proc.join() | |
finished_at = time.perf_counter() | |
print(f"Took {finished_at - started_at}") | |
def benchmark_leveldb_via_proxy(): | |
print(f"1 process serving leveldb, {NUM_READER_PROCESSES} processes reading {NUM_READS} times through IPC proxy") | |
run_benchmark(LevelDB, lambda path, db_path: multiprocessing.Process(target=launch_leveldb_reader_process, kwargs={'path': path})) | |
def benchmark_rocksdb_locally(): | |
print(f"1 process serving rocksdb, {NUM_READER_PROCESSES} processes reading {NUM_READS} times through local readonly connection") | |
run_benchmark(RocksDB, lambda path, db_path: multiprocessing.Process(target=launch_rocksdb_reader_process, kwargs={'path': db_path})) | |
if __name__ == '__main__': | |
benchmark_leveldb_via_proxy() | |
benchmark_rocksdb_locally() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment