-
-
Save vaniakov/e6251d38599ea722858a956a72ce2e2c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import gevent.monkey | |
gevent.monkey.patch_all() | |
import collections | |
import threading | |
import time | |
import random | |
import sys | |
import logging | |
logging.basicConfig() | |
log = logging.getLogger('foo') | |
log.setLevel(logging.CRITICAL) | |
import pymysql as dbapi | |
#from mysql import connector as dbapi | |
class SimplePool(object): | |
def __init__(self): | |
self.checkedin = collections.deque([ | |
self._connect() for i in range(50) | |
]) | |
self.checkout_lock = threading.Lock() | |
self.checkin_lock = threading.Lock() | |
# alternate form | |
# self.makeup_count = 0 | |
def _connect(self): | |
return dbapi.connect( | |
user="scott", passwd="tiger", | |
host="localhost", db="test") | |
def get(self): | |
with self.checkout_lock: | |
while not self.checkedin: | |
# reconnect here if we did the "defer" version | |
# if self.makeup_count: | |
# self.makeup_count -= 1 | |
# return self._connect() | |
time.sleep(.1) | |
return self.checkedin.pop() | |
def chuck_conn(self, conn): | |
try: | |
conn.close() | |
except: | |
log.error("Exception during close", exc_info=True) | |
# defer reconnection.. | |
# self.makeup_count += 1 | |
# or do it now, but check size of pool | |
if len(self.checkedin) < 50: | |
conn = self._connect() | |
with self.checkin_lock: | |
self.checkedin.append(conn) | |
def return_conn(self, conn): | |
try: | |
conn.rollback() | |
except: | |
log.error("Exception during rollback", exc_info=True) | |
self.chuck_conn(conn) | |
else: | |
with self.checkin_lock: | |
self.checkedin.append(conn) | |
def verify_connection_id(conn): | |
cursor = conn.cursor() | |
try: | |
cursor.execute("select connection_id()") | |
row = cursor.fetchone() | |
return row[0] | |
except: | |
return None | |
finally: | |
cursor.close() | |
def execute_sql(conn, sql, params=()): | |
cursor = conn.cursor() | |
cursor.execute(sql, params) | |
lastrowid = cursor.lastrowid | |
cursor.close() | |
return lastrowid | |
pool = SimplePool() | |
# SELECT * FROM table_b WHERE a_id not in | |
# (SELECT id FROM table_a) ORDER BY a_id DESC; | |
PREPARE_SQL = [ | |
"DROP TABLE IF EXISTS table_b", | |
"DROP TABLE IF EXISTS table_a", | |
"""CREATE TABLE table_a ( | |
id INT NOT NULL AUTO_INCREMENT, | |
data VARCHAR (256) NOT NULL, | |
PRIMARY KEY (id) | |
) engine='InnoDB'""", | |
"""CREATE TABLE table_b ( | |
id INT NOT NULL AUTO_INCREMENT, | |
a_id INT NOT NULL, | |
data VARCHAR (256) NOT NULL, | |
-- uncomment this to illustrate where the driver is attempting | |
-- to INSERT the row during ROLLBACK | |
-- FOREIGN KEY (a_id) REFERENCES table_a(id), | |
PRIMARY KEY (id) | |
) engine='InnoDB' | |
"""] | |
connection = pool.get() | |
for sql in PREPARE_SQL: | |
execute_sql(connection, sql) | |
connection.commit() | |
pool.return_conn(connection) | |
print("Table prepared...") | |
def transaction_kill_worker(): | |
while True: | |
try: | |
connection = None | |
with gevent.Timeout(0.1): | |
connection = pool.get() | |
rowid = execute_sql( | |
connection, | |
"INSERT INTO table_a (data) VALUES (%s)", ("a",)) | |
gevent.sleep(random.random() * 0.2) | |
try: | |
execute_sql( | |
connection, | |
"INSERT INTO table_b (a_id, data) VALUES (%s, %s)", | |
(rowid, "b",)) | |
# this version prevents the commit from | |
# proceeding on a bad connection | |
# if verify_connection_id(connection): | |
# connection.commit() | |
# this version does not. It will commit the | |
# row for table_b without the table_a being present. | |
connection.commit() | |
pool.return_conn(connection) | |
except Exception: | |
connection.rollback() | |
pool.return_conn(connection) | |
sys.stdout.write("$") | |
except gevent.Timeout: | |
# try to return the connection anyway | |
if connection is not None: | |
pool.chuck_conn(connection) | |
sys.stdout.write("#") | |
except Exception: | |
# logger.exception(e) | |
sys.stdout.write("@") | |
else: | |
sys.stdout.write(".") | |
finally: | |
if connection is not None: | |
pool.return_conn(connection) | |
def main(): | |
for i in range(50): | |
gevent.spawn(transaction_kill_worker) | |
gevent.sleep(3) | |
while True: | |
gevent.sleep(5) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment