Last active
June 30, 2018 11:57
-
-
Save nstewart/933227d6d51406688057981cc87201db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
from sqlalchemy import create_engine, Column, Integer | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker | |
from sqlalchemy import exc | |
import sys | |
Base = declarative_base() | |
# The Account class corresponds to the "accounts" database table. | |
class Account(Base): | |
__tablename__ = 'accounts' | |
id = Column(Integer, primary_key=True) | |
balance = Column(Integer) | |
# Create an engine to communicate with the database. Use "cockroachdb://" to connect to a cockroach database | |
engine = create_engine(sys.argv[1], echo=True) | |
Session = sessionmaker(bind=engine) | |
# Automatically create the "accounts" table based on the Account class. | |
Base.metadata.create_all(engine) | |
# Insert two rows into the "accounts" table. | |
session = Session() | |
def add_account(session, id, balance): | |
account = session.query(Account).filter_by(id=id).one() | |
if not account: | |
session.add(Account(id=id, balance=balance)) | |
session.commit() | |
# Print out the balances. | |
def print_account_balances(): | |
for account in session.query(Account): | |
print(account.id, account.balance) | |
# Wrapper for a transaction. | |
# This automatically re-calls "op" with the open transaction as an argument | |
# as long as the database server asks for the transaction to be retried. | |
def run_transaction(session, op): | |
session.execute("SAVEPOINT cockroach_restart") | |
while True: | |
try: | |
# Attempt the work. | |
op(session) | |
# If we reach this point, commit. | |
session.execute("RELEASE SAVEPOINT cockroach_restart") | |
session.commit() | |
break | |
except exc.OperationalError as e: | |
if type(e.orig) != psycopg2.extensions.TransactionRollbackError: | |
# A non-retryable error; report this up the call stack. | |
raise e | |
# Signal the database that we'll retry. | |
session.execute("ROLLBACK TO SAVEPOINT cockroach_restart") | |
# The transaction we want to run. | |
def transfer_funds(session, frm, to, amount): | |
# check current balance | |
source = session.query(Account).filter_by(id=frm).one() | |
from_balance = source.balance | |
if from_balance < amount: | |
raise "Insufficient funds" | |
#perform the transfer | |
source.balance = source.balance - amount | |
session.query(Account).filter_by(id=to).update({"balance": (Account.balance + amount)}) | |
add_account(session, 1, 1000) | |
add_account(session, 2, 800) | |
# Execute the transaction. | |
print_account_balances() | |
run_transaction(session, lambda conn: transfer_funds(session, 1, 2, 100)) | |
print_account_balances() | |
# Close communication with the database. | |
session.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment