Skip to content

Instantly share code, notes, and snippets.

@Satak
Created December 31, 2019 11:48
Show Gist options
  • Save Satak/b26ec62adafa71c62cee9db1cae58a3c to your computer and use it in GitHub Desktop.
Save Satak/b26ec62adafa71c62cee9db1cae58a3c to your computer and use it in GitHub Desktop.
Example how to use Azure CosmosDB Python SDK
from os import getenv
from azure.cosmos import CosmosClient, PartitionKey
import uuid
AZ_DB_ENDPOINT = getenv('AZ_DB_ENDPOINT')
AZ_DB_PRIMARYKEY = getenv('AZ_DB_PRIMARYKEY')
DATABASE_NAME = 'Tasks'
CONTAINER_NAME = 'Items'
AZ_DB_CLIENT = CosmosClient(AZ_DB_ENDPOINT, AZ_DB_PRIMARYKEY)
DATABASE = AZ_DB_CLIENT.create_database_if_not_exists(id=DATABASE_NAME)
CONTAINER = DATABASE.create_container_if_not_exists(
id=CONTAINER_NAME,
partition_key=PartitionKey(path='/category'),
offer_throughput=400
)
def get_items(item_id=None):
query = 'SELECT * FROM c'
if item_id:
query += ' WHERE c.id = @item_id'
params = {
'query': query,
'parameters': [{"name": "@item_id", "value": str(item_id)}],
'enable_cross_partition_query': True
}
res = CONTAINER.query_items(**params)
return list(res)
def create_item(item):
item['id'] = str(uuid.uuid4())
CONTAINER.create_item(body=item)
return item
def delete_item(item_id):
res = get_items(item_id)
if not res:
return False
item = res[0]
CONTAINER.delete_item(item, partition_key=item['category'])
return True
def complete_item(item_id, is_complete=True):
res = get_items(item_id)
if not res:
return False
item = res[0]
new_item = item
new_item['isComplete'] = is_complete
CONTAINER.replace_item(item, new_item)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment