Created
October 4, 2014 10:54
-
-
Save giwa/b695276b3e544f84c595 to your computer and use it in GitHub Desktop.
Simple Bench mark for dbs via python api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Bench mark for dbs via python api | |
# Support DBs | |
# - MongoDB via pymongo | |
# - Sqlite3 via sqlite3 | |
# | |
import sys | |
import time | |
from pymongo import MongoClient | |
DB = "bench" | |
COLLECTION = TABLE = "test" | |
TABLE = "test" | |
OUT_TMP = """\ | |
================================ | |
Test: {test_name} | |
The number of insert data: {num_insert} | |
Insert time: {insert_time} | |
The number of select data: {num_select} | |
Select time: {select_time} | |
Error rate: {error} | |
================================ | |
""" | |
class DBBench(object): | |
def __init__(self): | |
self._insert_num = None | |
self._insert_time = None | |
self._select_num = None | |
self._select_time = None | |
def insert_data(self, num): | |
raise NotImplementedError | |
def select_all_data(self): | |
raise NotImplementedError | |
def show(self): | |
print OUT_TMP.format(test_name=self.__class__.__name__, | |
num_insert=self._insert_num, | |
insert_time=self._insert_time, | |
num_select=self._select_num, | |
select_time=self._select_time, | |
error= 1 - self._insert_num / self._select_num) | |
class MongoBench(DBBench): | |
def __init__(self): | |
super(MongoBench, self).__init__() | |
self._con = MongoClient('localhost', 27017) | |
# Create db if not | |
self._db = self._con[DB] | |
# Drop collection if exsit | |
if COLLECTION in self._db.collection_names(): | |
self._db[COLLECTION].drop() | |
self._col = self._db.create_collection(COLLECTION) | |
def insert_data(self, num): | |
self._insert_num = num | |
start_time = time.time() | |
for i in range(1, num + 1): | |
data = {str(i):i} | |
self._col.insert(data) | |
self._insert_time = time.time() - start_time | |
def select_all_data(self): | |
start_time = time.time() | |
i = 0 | |
for data in self._col.find(): | |
i += 1 | |
self._select_time = time.time() - start_time | |
self._select_num = i | |
import sqlite3 | |
CREATE_TABLE_SQLITE3="""\ | |
CREATE TABLE IF NOT EXISTS test | |
(key text, value int) | |
""" | |
INSERT_SQLITE3="""\ | |
INSERT INTO test | |
VALUES ({key}, {value}) | |
""" | |
SELECT_SQLITE3="""\ | |
SELECT * FROM test | |
""" | |
DROP_TABLE_SQLITE3="""\ | |
DROP TABLE IF EXISTS test | |
""" | |
class Sqlite3Bench(DBBench): | |
def __init__(self, db="/tmp/test.db"): | |
super(Sqlite3Bench, self).__init__() | |
self._con = sqlite3.connect(db) | |
self._init_table() | |
def _init_table(self): | |
with self._con: | |
self._con.execute(DROP_TABLE_SQLITE3) | |
self._con.execute(CREATE_TABLE_SQLITE3) | |
def insert_data(self, num): | |
self._insert_num = num | |
start_time = time.time() | |
with self._con: | |
for i in range(1, num + 1): | |
sql = INSERT_SQLITE3.format(key=str(i), | |
value=str(i)) | |
self._con.execute(sql) | |
self._insert_time = time.time() - start_time | |
def select_all_data(self): | |
start_time = time.time() | |
i = 0 | |
for row in self._con.execute(SELECT_SQLITE3): | |
i += 1 | |
self._select_time = time.time() - start_time | |
self._select_num = i | |
class Sqlite3MemoryBench(Sqlite3Bench): | |
def __init__(self): | |
super(Sqlite3MemoryBench, self).__init__(":memory:") | |
def run_db_benches(num): | |
dbs = [MongoBench(), | |
Sqlite3Bench(), | |
Sqlite3MemoryBench()] | |
results = list() | |
for db in dbs: | |
db.insert_data(num) | |
db.select_all_data() | |
results.append(db) | |
for result in results: | |
result.show() | |
if __name__ == "__main__": | |
if len(sys.argv) == 2: | |
print >> sys.stderr, "Usage: db_bench.py <number of data>" | |
exit(-1) | |
num = int(sys.argv[1]) | |
run_db_benches(num) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment