Created
December 31, 2019 11:48
-
-
Save Satak/b26ec62adafa71c62cee9db1cae58a3c to your computer and use it in GitHub Desktop.
Example how to use Azure CosmosDB Python SDK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import getenv | |
from azure.cosmos import CosmosClient, PartitionKey | |
import uuid | |
AZ_DB_ENDPOINT = getenv('AZ_DB_ENDPOINT') | |
AZ_DB_PRIMARYKEY = getenv('AZ_DB_PRIMARYKEY') | |
DATABASE_NAME = 'Tasks' | |
CONTAINER_NAME = 'Items' | |
AZ_DB_CLIENT = CosmosClient(AZ_DB_ENDPOINT, AZ_DB_PRIMARYKEY) | |
DATABASE = AZ_DB_CLIENT.create_database_if_not_exists(id=DATABASE_NAME) | |
CONTAINER = DATABASE.create_container_if_not_exists( | |
id=CONTAINER_NAME, | |
partition_key=PartitionKey(path='/category'), | |
offer_throughput=400 | |
) | |
def get_items(item_id=None): | |
query = 'SELECT * FROM c' | |
if item_id: | |
query += ' WHERE c.id = @item_id' | |
params = { | |
'query': query, | |
'parameters': [{"name": "@item_id", "value": str(item_id)}], | |
'enable_cross_partition_query': True | |
} | |
res = CONTAINER.query_items(**params) | |
return list(res) | |
def create_item(item): | |
item['id'] = str(uuid.uuid4()) | |
CONTAINER.create_item(body=item) | |
return item | |
def delete_item(item_id): | |
res = get_items(item_id) | |
if not res: | |
return False | |
item = res[0] | |
CONTAINER.delete_item(item, partition_key=item['category']) | |
return True | |
def complete_item(item_id, is_complete=True): | |
res = get_items(item_id) | |
if not res: | |
return False | |
item = res[0] | |
new_item = item | |
new_item['isComplete'] = is_complete | |
CONTAINER.replace_item(item, new_item) | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment