Skip to content

Instantly share code, notes, and snippets.

@AFirooz
Last active April 29, 2025 05:22
Show Gist options
  • Save AFirooz/dd8d52a973174f992afb9425690aac5e to your computer and use it in GitHub Desktop.
Save AFirooz/dd8d52a973174f992afb9425690aac5e to your computer and use it in GitHub Desktop.
Demo aiomysql
# Topic: Connection & Cursor
# this doesn't work in Jupyter notebooks!
# https://aiomysql.readthedocs.io/en/stable/cursors.html
# #############################################################################################################
# A cursor for connection.
# Allows Python code to execute MySQL command in a database session. Cursors are created by the `Connection.cursor()`:
# They are bound to the connection for the entire lifetime and all the commands are executed in the context of the database
# session wrapped by the connection.
#
# Cursors that are created from the same connection are not isolated,
# i.e., any changes done to the database by a cursor are immediately visible by the other cursors.
# Cursors created from different connections can or can not be isolated, depending on the connections’ isolation level.
# #############################################################################################################
import asyncio
import aiomysql
import time
import mypassword as ps
loop = asyncio.get_event_loop()
def sleep():
print('\n >>> Sleeping...', end='')
time.sleep(3)
print('Done sleeping <<<\n')
async def test_example():
conn = await aiomysql.connect(host=ps.host_name, user=ps.user_name, password=ps.sql_password, db=ps.schema,
loop=loop, # asyncio event loop instance
)
# create the default cursor. Using `Connection.cursor()` for getting cursor for connection.
cur = await conn.cursor()
# cur.execute(query:str, args:tuple=None) the args are tuple or list of arguments for sql query
await cur.execute("SELECT ID FROM Genes LIMIT %s;", (10,))
# Topic: iterating over the list of parameters
"""
# INSERT statements are optimized by batching the data, that is using the MySQL multiple rows syntax.
data = [
('Jane','555-001'),
('Joe', '555-001'),
('John', '555-003')
]
stmt = "INSERT INTO employees (name, phone) VALUES ('%s','%s')"
await cursor.executemany(stmt, data)
"""
# refer to https://aiomysql.readthedocs.io/en/stable/cursors.html#Cursor.description to understand the printed output
print(f"{cur.description=}")
# fetch n results
r = await cur.fetchmany(size=2)
print(f"result 1, 2: {r=}")
# fetch next result
r = await cur.fetchone()
print(f"result 3: {r=}")
r = await cur.fetchmany(2)
print(f"result 4, 5: {r=}")
# fetch all (remaining) results
r = await cur.fetchall()
print(f"remaining result: {r=}")
# moving the cursor to the beginning of the result set
await cur.scroll(0, mode='absolute') # mode='absolute' | 'relative' (default)
r = await cur.fetchall()
print(f"all results: {r=}")
# The `cur.rowcount` is -1 in case no `cursor.execute()` has been performed on the cursor or
# the row count of the last operation can’t be determined by the interface.
print(f"\nThe number of rows that has been produced (or affected) by the last sql statement: {cur.rowcount}")
# detach cursor from connection
await cur.close()
print(f"---------------------")
# Topic: return a dictionary
# create dict cursor
cur = await conn.cursor(aiomysql.DictCursor)
await cur.execute("SELECT ID, AKT1 FROM Genes LIMIT %s;", (5,))
r = await cur.fetchone()
print(f"result 1: {r=}")
r = await cur.fetchall()
print(f"remaining results: {r=}")
try:
print(f"{r[0].ID=}")
except AttributeError:
print(f"\nX X X X X X Can't access the attributes directly for 'r[0].ID' X X X X X X\n")
await cur.close()
print(f"---------------------")
# Topic: pass a class with attribute names
class AttrDict(dict):
"""Dict that can get attribute by dot, and doesn't raise KeyError"""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
return None
class AttrDictCursor(aiomysql.DictCursor):
dict_type = AttrDict
cur = await conn.cursor(AttrDictCursor)
await cur.execute("SELECT ID, AKT1 FROM Genes LIMIT %s;", (5,))
r = await cur.fetchone()
print(f"result 1: {r=}")
r = await cur.fetchall()
print(f"remaining results: {r=}")
print(f"{r[0].ID=}")
await cur.close()
print(f"---------------------")
# Topic: pass a class to cursor
"""
# class SSCursor
Unbuffered Cursor, mainly useful for queries that return a lot of data, or for connections to remote servers over a slow network.
Instead of copying every row of data into a buffer, this will fetch rows as needed. The upside of this, is the client uses much less
memory, and rows are returned much faster when traveling over a slow network, or if the result set is very big.
There are limitations, though. The MySQL protocol doesn’t support returning the total number of rows, so the only way to tell how
many rows there are is to iterate over every row returned. Also, it currently isn’t possible to scroll backwards, as only the current
row is held in memory. All methods are the same as in Cursor but with different behaviour.
# class SSDictCursor
An unbuffered cursor, which returns results as a dictionary.
"""
cur = await conn.cursor(aiomysql.SSCursor)
await cur.execute("SELECT ID, AKT1 FROM Genes LIMIT %s;", (5,))
r = await cur.fetchone()
print(f"result 1: {r=}")
await cur.close()
# close connection
conn.close()
if __name__ == '__main__':
loop.run_until_complete(test_example())
print("Done")
import asyncio
import aiomysql
import mypassword as ps
loop = asyncio.get_event_loop()
# Topic: Pooling
# The library provides connection pool as well as plain Connection objects.
async def go():
"""
creates a pool of connections to MySQL database.
see https://aiomysql.readthedocs.io/en/stable/pool.html#create_pool for more args
After creation pool has minsize free connections and can grow up to maxsize ones.
If minsize is 0 the pool doesn’t creates any connection on startup.
If maxsize is 0 than size of pool is unlimited (but it recycles used connections of course).
"""
pool = await aiomysql.create_pool(host=ps.host_name, user=ps.user_name, password=ps.sql_password, db=ps.schema,
loop=loop, autocommit=False)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT ID FROM Genes LIMIT %s;", (10,))
# print(cur.description)
r = await cur.fetchone()
pool.close()
await pool.wait_closed() # wait for actual closing of acquired connection
# Topic: autocommit
async def test_example_execute(loop):
conn = await aiomysql.connect(host=ps.host_name, user=ps.user_name, password=ps.sql_password, db=ps.schema,
loop=loop, autocommit=False)
cur = await conn.cursor()
async with conn.cursor() as cur:
await cur.execute("DROP TABLE IF EXISTS music_style;")
await cur.execute(
"""CREATE TABLE music_style (id INT, name VARCHAR(255), PRIMARY KEY (id));"""
)
await conn.commit()
# having autocommit=False requires us to commit the changes manually, but this makes it more efficient to run
# multiple queries together
# insert 3 rows one by one
await cur.execute("INSERT INTO music_style VALUES(1,'heavy metal')")
await cur.execute("INSERT INTO music_style VALUES(2,'death metal');")
await cur.execute("INSERT INTO music_style VALUES(3,'power metal');")
await conn.commit()
conn.close()
if __name__ == '__main__':
loop.run_until_complete(go())
loop = asyncio.get_event_loop()
loop.run_until_complete(test_example_execute(loop))
import asyncio
import aiomysql
import mypassword as ps
async def test_connection():
try:
conn = await aiomysql.connect(
host=ps.host_name,
# port=3306,
user=ps.user_name,
password=ps.sql_password,
db=ps.schema,
)
print("Connection successful!")
conn.close()
except Exception as e:
print(f"Connection failed: {str(e)}")
if __name__ == '__main__':
asyncio.run(test_connection())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment