Last active
June 15, 2023 01:52
-
-
Save bthaman/633f648e41b3680e7258b4945bba9da0 to your computer and use it in GitHub Desktop.
How to do a SQL Server transaction rollback in pyodbc. Demonstrates how to keep your database consistent when performing a series of updates using pyodbc, and something goes wrong somewhere in the middle of it all.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Demonstrate how to keep your SQL Server database consistent when performing a series of updates using pyodbc, and something goes wrong | |
somewhere in the middle of it all. | |
Transactions are managed at the connection level (not the cursor level). When creating the connection, set autocommit=False. | |
When a command (e.g., an update) is executed against the connection, it will not be committed automatically. | |
If you are executing multiple commands against the connection, and an error is raised before all the commands are complete, | |
your database may not be consistent. But, since the commands were not committed, it's ok. | |
A pyodbc connection does not have a rollback method, so when handling an error, don't do a commit against the connection, | |
but go ahead and close it. When the connection is closed, the executed commands are effectively rolled back. | |
""" | |
import pyodbc | |
class SqlConnection: | |
def __init__(self, cnn_string): | |
self.cnn_string = cnn_string | |
self.connection = pyodbc.connect(self.cnn_string, autocommit=False) | |
self.cursor = self.connection.cursor() | |
self.results = None | |
def close_cnn(self): | |
self.connection.close() | |
def commit(self): | |
self.connection.commit() | |
def command_execute(self, sql_command, commit=False): | |
self.cursor.execute(sql_command) | |
if commit: | |
self.connection.commit() | |
return self.cursor | |
def getresults(self): | |
return self.results | |
def query_execute(self, sql_command): | |
try: | |
self.cursor.execute(sql_command) | |
self.results = self.cursor.fetchall() | |
return self.results | |
except Exception as e: | |
self.results = e | |
return self.results | |
if __name__ == '__main__': | |
try: | |
# create a connection (formatted for SQLEXPRESS, your connection string may look different) | |
conn = SqlConnection(cnn_string='Driver={SQL Server};Server=<your server>;Database=<your db>;') | |
# attempt to perform two updates. this is a made-up situation, but assume that both updates must be successfully | |
# executed for the db to remain in a consistent state. | |
# 1st update: | |
sql = "update Forecast set PM = 'WJT' where PM = 'WJT2'" | |
cursor = conn.command_execute(sql, commit=False) | |
print(str(cursor.rowcount) + ' row(s) modified (NOT committed yet!)') | |
# some more logic might go here... | |
# for demo purposes, explicitly raise an error before the 2nd update (db not consistent) | |
raise ValueError('Error occurred before 2nd update. Nothing will be committed!') | |
# 2nd update required to make db consistent: | |
sql = "update PM set Central_PM = 'WJT' where Central_PM = 'WJT2'" | |
cursor = conn.command_execute(sql, commit=False) | |
print(str(cursor.rowcount) + ' row(s) modified (NOT committed yet!)') | |
# after all logic executed, db should be in a consistent state, and connection committed | |
conn.commit() | |
print('Connection, and all commands against it, comitted.') | |
except pyodbc.OperationalError as e: | |
print('Could not establish connection: ' + str(e)) | |
except ValueError as e: | |
print(str(e)) | |
finally: | |
# attempt to close the connection | |
try: | |
conn.close_cnn() | |
except: | |
pass | |
try: | |
# for demo purposes, query one of the tables - if done after closing the connection, | |
# and the connection not committed, you should see the original values. | |
# move this try block before closing the connection, and you should see the values updated - | |
# closing the connection does the "rollback" | |
conn = SqlConnection(cnn_string='Driver={SQL Server};Server=<your server>;Database=<your db>;') | |
sql = 'select Central_PM from PM' | |
results = conn.query_execute(sql) | |
for PM in results: | |
print(PM[0].strip()) | |
conn.close_cnn() | |
except: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment