Last active
April 29, 2025 05:22
-
-
Save AFirooz/dd8d52a973174f992afb9425690aac5e to your computer and use it in GitHub Desktop.
Demo aiomysql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Topic: Connection & Cursor | |
# this doesn't work in Jupyter notebooks! | |
# https://aiomysql.readthedocs.io/en/stable/cursors.html | |
# ############################################################################################################# | |
# A cursor for connection. | |
# Allows Python code to execute MySQL command in a database session. Cursors are created by the `Connection.cursor()`: | |
# They are bound to the connection for the entire lifetime and all the commands are executed in the context of the database | |
# session wrapped by the connection. | |
# | |
# Cursors that are created from the same connection are not isolated, | |
# i.e., any changes done to the database by a cursor are immediately visible by the other cursors. | |
# Cursors created from different connections can or can not be isolated, depending on the connections’ isolation level. | |
# ############################################################################################################# | |
import asyncio | |
import aiomysql | |
import time | |
import mypassword as ps | |
loop = asyncio.get_event_loop() | |
def sleep(): | |
print('\n >>> Sleeping...', end='') | |
time.sleep(3) | |
print('Done sleeping <<<\n') | |
async def test_example(): | |
conn = await aiomysql.connect(host=ps.host_name, user=ps.user_name, password=ps.sql_password, db=ps.schema, | |
loop=loop, # asyncio event loop instance | |
) | |
# create the default cursor. Using `Connection.cursor()` for getting cursor for connection. | |
cur = await conn.cursor() | |
# cur.execute(query:str, args:tuple=None) the args are tuple or list of arguments for sql query | |
await cur.execute("SELECT ID FROM Genes LIMIT %s;", (10,)) | |
# Topic: iterating over the list of parameters | |
""" | |
# INSERT statements are optimized by batching the data, that is using the MySQL multiple rows syntax. | |
data = [ | |
('Jane','555-001'), | |
('Joe', '555-001'), | |
('John', '555-003') | |
] | |
stmt = "INSERT INTO employees (name, phone) VALUES ('%s','%s')" | |
await cursor.executemany(stmt, data) | |
""" | |
# refer to https://aiomysql.readthedocs.io/en/stable/cursors.html#Cursor.description to understand the printed output | |
print(f"{cur.description=}") | |
# fetch n results | |
r = await cur.fetchmany(size=2) | |
print(f"result 1, 2: {r=}") | |
# fetch next result | |
r = await cur.fetchone() | |
print(f"result 3: {r=}") | |
r = await cur.fetchmany(2) | |
print(f"result 4, 5: {r=}") | |
# fetch all (remaining) results | |
r = await cur.fetchall() | |
print(f"remaining result: {r=}") | |
# moving the cursor to the beginning of the result set | |
await cur.scroll(0, mode='absolute') # mode='absolute' | 'relative' (default) | |
r = await cur.fetchall() | |
print(f"all results: {r=}") | |
# The `cur.rowcount` is -1 in case no `cursor.execute()` has been performed on the cursor or | |
# the row count of the last operation can’t be determined by the interface. | |
print(f"\nThe number of rows that has been produced (or affected) by the last sql statement: {cur.rowcount}") | |
# detach cursor from connection | |
await cur.close() | |
print(f"---------------------") | |
# Topic: return a dictionary | |
# create dict cursor | |
cur = await conn.cursor(aiomysql.DictCursor) | |
await cur.execute("SELECT ID, AKT1 FROM Genes LIMIT %s;", (5,)) | |
r = await cur.fetchone() | |
print(f"result 1: {r=}") | |
r = await cur.fetchall() | |
print(f"remaining results: {r=}") | |
try: | |
print(f"{r[0].ID=}") | |
except AttributeError: | |
print(f"\nX X X X X X Can't access the attributes directly for 'r[0].ID' X X X X X X\n") | |
await cur.close() | |
print(f"---------------------") | |
# Topic: pass a class with attribute names | |
class AttrDict(dict): | |
"""Dict that can get attribute by dot, and doesn't raise KeyError""" | |
def __getattr__(self, name): | |
try: | |
return self[name] | |
except KeyError: | |
return None | |
class AttrDictCursor(aiomysql.DictCursor): | |
dict_type = AttrDict | |
cur = await conn.cursor(AttrDictCursor) | |
await cur.execute("SELECT ID, AKT1 FROM Genes LIMIT %s;", (5,)) | |
r = await cur.fetchone() | |
print(f"result 1: {r=}") | |
r = await cur.fetchall() | |
print(f"remaining results: {r=}") | |
print(f"{r[0].ID=}") | |
await cur.close() | |
print(f"---------------------") | |
# Topic: pass a class to cursor | |
""" | |
# class SSCursor | |
Unbuffered Cursor, mainly useful for queries that return a lot of data, or for connections to remote servers over a slow network. | |
Instead of copying every row of data into a buffer, this will fetch rows as needed. The upside of this, is the client uses much less | |
memory, and rows are returned much faster when traveling over a slow network, or if the result set is very big. | |
There are limitations, though. The MySQL protocol doesn’t support returning the total number of rows, so the only way to tell how | |
many rows there are is to iterate over every row returned. Also, it currently isn’t possible to scroll backwards, as only the current | |
row is held in memory. All methods are the same as in Cursor but with different behaviour. | |
# class SSDictCursor | |
An unbuffered cursor, which returns results as a dictionary. | |
""" | |
cur = await conn.cursor(aiomysql.SSCursor) | |
await cur.execute("SELECT ID, AKT1 FROM Genes LIMIT %s;", (5,)) | |
r = await cur.fetchone() | |
print(f"result 1: {r=}") | |
await cur.close() | |
# close connection | |
conn.close() | |
if __name__ == '__main__': | |
loop.run_until_complete(test_example()) | |
print("Done") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiomysql | |
import mypassword as ps | |
loop = asyncio.get_event_loop() | |
# Topic: Pooling | |
# The library provides connection pool as well as plain Connection objects. | |
async def go(): | |
""" | |
creates a pool of connections to MySQL database. | |
see https://aiomysql.readthedocs.io/en/stable/pool.html#create_pool for more args | |
After creation pool has minsize free connections and can grow up to maxsize ones. | |
If minsize is 0 the pool doesn’t creates any connection on startup. | |
If maxsize is 0 than size of pool is unlimited (but it recycles used connections of course). | |
""" | |
pool = await aiomysql.create_pool(host=ps.host_name, user=ps.user_name, password=ps.sql_password, db=ps.schema, | |
loop=loop, autocommit=False) | |
async with pool.acquire() as conn: | |
async with conn.cursor() as cur: | |
await cur.execute("SELECT ID FROM Genes LIMIT %s;", (10,)) | |
# print(cur.description) | |
r = await cur.fetchone() | |
pool.close() | |
await pool.wait_closed() # wait for actual closing of acquired connection | |
# Topic: autocommit | |
async def test_example_execute(loop): | |
conn = await aiomysql.connect(host=ps.host_name, user=ps.user_name, password=ps.sql_password, db=ps.schema, | |
loop=loop, autocommit=False) | |
cur = await conn.cursor() | |
async with conn.cursor() as cur: | |
await cur.execute("DROP TABLE IF EXISTS music_style;") | |
await cur.execute( | |
"""CREATE TABLE music_style (id INT, name VARCHAR(255), PRIMARY KEY (id));""" | |
) | |
await conn.commit() | |
# having autocommit=False requires us to commit the changes manually, but this makes it more efficient to run | |
# multiple queries together | |
# insert 3 rows one by one | |
await cur.execute("INSERT INTO music_style VALUES(1,'heavy metal')") | |
await cur.execute("INSERT INTO music_style VALUES(2,'death metal');") | |
await cur.execute("INSERT INTO music_style VALUES(3,'power metal');") | |
await conn.commit() | |
conn.close() | |
if __name__ == '__main__': | |
loop.run_until_complete(go()) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(test_example_execute(loop)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiomysql | |
import mypassword as ps | |
async def test_connection(): | |
try: | |
conn = await aiomysql.connect( | |
host=ps.host_name, | |
# port=3306, | |
user=ps.user_name, | |
password=ps.sql_password, | |
db=ps.schema, | |
) | |
print("Connection successful!") | |
conn.close() | |
except Exception as e: | |
print(f"Connection failed: {str(e)}") | |
if __name__ == '__main__': | |
asyncio.run(test_connection()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment