Skip to content

Instantly share code, notes, and snippets.

@max-arnold
Created December 4, 2022 11:35
Show Gist options
  • Save max-arnold/e553cbdaacda12c2aa3464f5226654c0 to your computer and use it in GitHub Desktop.
Save max-arnold/e553cbdaacda12c2aa3464f5226654c0 to your computer and use it in GitHub Desktop.
YDB async Python playground script to trigger the TLI scenario
#!/usr/bin/env python
import os
import asyncio
import ydb
JOB_DDL = """
CREATE TABLE job (
id String NOT NULL,
status String,
payload JsonDocument,
PRIMARY KEY (id)
);
"""
JOB_INIT1 = """
DELETE FROM job;
"""
JOB_INIT2 = """
INSERT INTO job (id, status) VALUES ('123', 'new');
"""
JOB_SELECT_ALL = """
SELECT * FROM job;
"""
JOB_SELECT = """
SELECT * FROM job WHERE id = '123' and status = 'new';
"""
JOB_UPDATE = """
UPDATE job SET status = 'processing' WHERE id = '123' AND status = 'new';
"""
def pp(msg, res):
for rset in res:
print(msg, *[row[::] for row in rset.rows], sep="\n")
async def execute(pool, query, params=None, commit_tx=True):
params = params or {}
async def do_execute(session):
prepared_query = await session.prepare(query)
# SerializableReadWrite, OnlineReadOnly, StaleReadOnly
return await session.transaction(ydb.SerializableReadWrite()).execute(
prepared_query,
params,
commit_tx=commit_tx,
settings=ydb.BaseRequestSettings().with_timeout(3).with_operation_timeout(2)
)
return await pool.retry_operation(do_execute)
async def coro1(pool):
print("Coro 1")
async with pool.checkout() as session:
async with session.transaction() as tx:
print("Coro 1 begin")
await tx.begin()
print("Coro 1 select")
pp("Coro 1 select res:", await tx.execute(JOB_SELECT))
print("Coro 1 update")
await tx.execute(JOB_UPDATE)
print("Coro 1 commit")
await tx.commit()
print("Coro 1 end")
async def coro2(pool):
print("Coro 2")
async with pool.checkout() as session:
async with session.transaction() as tx:
print("Coro 2 begin")
await tx.begin()
print("Coro 2 select")
pp("Coro 2 select res:", await tx.execute(JOB_SELECT))
print("Coro 2 sleep")
await asyncio.sleep(0.5)
print("Coro 2 update")
await tx.execute(JOB_UPDATE)
print("Coro 2 commit")
await tx.commit()
print("Coro 2 end")
async def main():
async with ydb.aio.Driver(
endpoint=os.getenv('YDB_ENDPOINT'),
database=os.getenv('YDB_DATABASE'),
) as driver:
await driver.wait(timeout=5, fail_fast=True)
async with ydb.aio.SessionPool(driver, size=10) as pool:
async with pool.checkout() as session:
await session.execute_scheme(JOB_DDL)
await session.transaction().execute(JOB_INIT1, commit_tx=True)
await session.transaction().execute(JOB_INIT2, commit_tx=True)
data = await execute(pool, JOB_SELECT_ALL)
pp("After init:", await execute(pool, JOB_SELECT_ALL))
await asyncio.gather(coro1(pool), coro2(pool))
pp("After coro:", await execute(pool, JOB_SELECT_ALL))
if __name__ == "__main__":
# Export these env vars to run the script:
#
# export YDB_ENDPOINT=...
# export YDB_DATABASE=...
# export YDB_SERVICE_ACCOUNT_KEY_FILE_CREDENTIALS=...
#
asyncio.get_event_loop().run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment