Created
June 15, 2017 08:58
-
-
Save ficapy/035d2ace6fb19a3cb04b560f0b1bda5b to your computer and use it in GitHub Desktop.
乐观锁和悲观锁以及事物隔离级别
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # !/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| # Author: ficapy | |
| import time | |
| import threading | |
| from threading import Thread | |
| from sqlalchemy import create_engine, Column, Integer | |
| from sqlalchemy.orm import sessionmaker | |
| from sqlalchemy.ext.declarative import declarative_base | |
| sqlalchemy_db = 'postgresql+psycopg2://xxx:xxx@127.0.0.1/xxx' | |
| Base = declarative_base() | |
| class User(Base): | |
| __tablename__ = 'user' | |
| id = Column(Integer, primary_key=True) | |
| version_id = Column(Integer, nullable=False) | |
| count = Column(Integer) | |
| # 没有解决如何动态添加的问题 | |
| __mapper_args__ = { | |
| "version_id_col": version_id | |
| } | |
| uline_engine = create_engine(sqlalchemy_db, pool_recycle=3600, echo=False, pool_size=20, max_overflow=10) | |
| Base.metadata.bind = uline_engine | |
| Base.metadata.create_all(checkfirst=True) | |
| db_Session = sessionmaker() | |
| db_Session.configure(bind=uline_engine) | |
| def init(): | |
| s = db_Session() | |
| init_record = s.query(User).first() | |
| if init_record: | |
| init_record.count = 0 | |
| else: | |
| s.add(User(count=0)) | |
| s.commit() | |
| def auto_next(func): | |
| def inner(*args, **kwargs): | |
| f = func(*args, **kwargs) | |
| next(f) | |
| return f | |
| return inner | |
| @auto_next | |
| def table_print(header): | |
| field = {position: len(str(data)) * 2 for position, data in enumerate(header)} | |
| one_line = ' | '.join('{{:^{}}}'.format(length) for _, length in sorted(field.items())) | |
| print(one_line.format(*header)) | |
| print('-|-'.join(field[i] * '-' for i in range(len(header)))) | |
| while 1: | |
| x = yield | |
| print(one_line.format(*x)) | |
| def multi_thread(concurrency_num=1, isolation=None, version_control=False): | |
| start = time.time() | |
| init() | |
| semaphore = threading.Semaphore(concurrency_num) | |
| retry = 0 | |
| def execute(): | |
| nonlocal retry | |
| with semaphore: | |
| while 1: | |
| try: | |
| session = db_Session() | |
| if isolation is True: | |
| session.execute('set transaction isolation level Repeatable Read') | |
| u = session.query(User).first() | |
| u.count = u.count + 1 | |
| session.commit() | |
| except Exception: | |
| retry += 1 | |
| else: | |
| break | |
| thread_list = [] | |
| for i in range(1000): | |
| t = Thread(target=execute) | |
| thread_list.append(t) | |
| t.start() | |
| for i in thread_list: | |
| i.join() | |
| ret = db_Session().query(User).first().count | |
| uline_engine.dispose() | |
| return concurrency_num, bool(isolation), bool(version_control), ret, '{:.2f}'.format(time.time() - start), retry | |
| header = ['concurrency_num', 'isolation', 'version_control', 'result', 'time', 'retry'] | |
| table = table_print(header) | |
| table.send(multi_thread(1)) | |
| table.send(multi_thread(20)) | |
| table.send(multi_thread(20, version_control=True)) | |
| table.send(multi_thread(20, True)) | |
| # concurrency_num | isolation | version_control | result | time | retry | |
| # -------------------------------|--------------------|--------------------------------|--------------|----------|----------- | |
| # 1 | 0 | 0 | 1000 | 2.68 | 0 | |
| # 20 | 0 | 0 | 66 | 1.87 | 0 | |
| # 20 | 0 | 1 | 1000 | 2.78 | 121 | |
| # 20 | 1 | 0 | 1000 | 2.94 | 102 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment