Skip to content

Instantly share code, notes, and snippets.

@Mulugruntz
Last active February 26, 2020 10:38
Show Gist options
  • Save Mulugruntz/97d904db4446428d0815dd626b812cbd to your computer and use it in GitHub Desktop.
Save Mulugruntz/97d904db4446428d0815dd626b812cbd to your computer and use it in GitHub Desktop.
For EdgeDB.The asynchronous function `insert_if_not_exists` does exactly as it says.
import asyncio
from dataclasses import dataclass
from typing import Any, List
import edgedb
from edgedb import AsyncIOConnection
@dataclass
class EdgeObjectProperty:
name: str
type: str
value: Any
async def insert_if_not_exists(connection: AsyncIOConnection, table_name: str, values: List[EdgeObjectProperty]) -> Any:
select_filters = [f"({table_name}.{value.name} = <{value.type}>${value.name})" for value in values]
select_filters_expr = ' AND '.join(select_filters)
insert_filters = [f"{value.name} := <{value.type}>${value.name}" for value in values]
insert_filters_expr = ', '.join(insert_filters)
kwargs = {value.name: value.value for value in values}
query = f"""
FOR _ IN {{
(SELECT true FILTER (NOT EXISTS (
SELECT {table_name}
FILTER {select_filters_expr}
)))
}}
UNION (
INSERT {table_name} {{
{insert_filters_expr}
}}
)
"""
async with connection.transaction():
try:
return await connection.fetchall(query, **kwargs)
except Exception as e:
raise e
async def main():
async with await edgedb.create_async_pool(host='edgedb', port=5656, user='edgedb', database='edgedb') as pool:
async with pool.acquire() as connection:
await insert_if_not_exists(
connection=connection,
table_name='Text',
values=[EdgeObjectProperty(name='body', type='str', value='The cake is a lie. Haha.')],
)
await insert_if_not_exists(
connection=connection,
table_name='Item',
values=[
EdgeObjectProperty(name='name', type='str', value='Chair'),
EdgeObjectProperty(name='price', type='float64', value=42.23),
],
)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment