Created
December 4, 2022 11:35
-
-
Save max-arnold/e553cbdaacda12c2aa3464f5226654c0 to your computer and use it in GitHub Desktop.
YDB async Python playground script to trigger the TLI scenario
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import asyncio | |
import ydb | |
JOB_DDL = """ | |
CREATE TABLE job ( | |
id String NOT NULL, | |
status String, | |
payload JsonDocument, | |
PRIMARY KEY (id) | |
); | |
""" | |
JOB_INIT1 = """ | |
DELETE FROM job; | |
""" | |
JOB_INIT2 = """ | |
INSERT INTO job (id, status) VALUES ('123', 'new'); | |
""" | |
JOB_SELECT_ALL = """ | |
SELECT * FROM job; | |
""" | |
JOB_SELECT = """ | |
SELECT * FROM job WHERE id = '123' and status = 'new'; | |
""" | |
JOB_UPDATE = """ | |
UPDATE job SET status = 'processing' WHERE id = '123' AND status = 'new'; | |
""" | |
def pp(msg, res): | |
for rset in res: | |
print(msg, *[row[::] for row in rset.rows], sep="\n") | |
async def execute(pool, query, params=None, commit_tx=True): | |
params = params or {} | |
async def do_execute(session): | |
prepared_query = await session.prepare(query) | |
# SerializableReadWrite, OnlineReadOnly, StaleReadOnly | |
return await session.transaction(ydb.SerializableReadWrite()).execute( | |
prepared_query, | |
params, | |
commit_tx=commit_tx, | |
settings=ydb.BaseRequestSettings().with_timeout(3).with_operation_timeout(2) | |
) | |
return await pool.retry_operation(do_execute) | |
async def coro1(pool): | |
print("Coro 1") | |
async with pool.checkout() as session: | |
async with session.transaction() as tx: | |
print("Coro 1 begin") | |
await tx.begin() | |
print("Coro 1 select") | |
pp("Coro 1 select res:", await tx.execute(JOB_SELECT)) | |
print("Coro 1 update") | |
await tx.execute(JOB_UPDATE) | |
print("Coro 1 commit") | |
await tx.commit() | |
print("Coro 1 end") | |
async def coro2(pool): | |
print("Coro 2") | |
async with pool.checkout() as session: | |
async with session.transaction() as tx: | |
print("Coro 2 begin") | |
await tx.begin() | |
print("Coro 2 select") | |
pp("Coro 2 select res:", await tx.execute(JOB_SELECT)) | |
print("Coro 2 sleep") | |
await asyncio.sleep(0.5) | |
print("Coro 2 update") | |
await tx.execute(JOB_UPDATE) | |
print("Coro 2 commit") | |
await tx.commit() | |
print("Coro 2 end") | |
async def main(): | |
async with ydb.aio.Driver( | |
endpoint=os.getenv('YDB_ENDPOINT'), | |
database=os.getenv('YDB_DATABASE'), | |
) as driver: | |
await driver.wait(timeout=5, fail_fast=True) | |
async with ydb.aio.SessionPool(driver, size=10) as pool: | |
async with pool.checkout() as session: | |
await session.execute_scheme(JOB_DDL) | |
await session.transaction().execute(JOB_INIT1, commit_tx=True) | |
await session.transaction().execute(JOB_INIT2, commit_tx=True) | |
data = await execute(pool, JOB_SELECT_ALL) | |
pp("After init:", await execute(pool, JOB_SELECT_ALL)) | |
await asyncio.gather(coro1(pool), coro2(pool)) | |
pp("After coro:", await execute(pool, JOB_SELECT_ALL)) | |
if __name__ == "__main__": | |
# Export these env vars to run the script: | |
# | |
# export YDB_ENDPOINT=... | |
# export YDB_DATABASE=... | |
# export YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=... | |
# | |
asyncio.get_event_loop().run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment