Skip to content

Instantly share code, notes, and snippets.

@sjwaight
Last active October 27, 2020 16:17
Show Gist options
  • Save sjwaight/3c5cf9503f588b190b5ff02bb79f07f0 to your computer and use it in GitHub Desktop.
Save sjwaight/3c5cf9503f588b190b5ff02bb79f07f0 to your computer and use it in GitHub Desktop.
How to pass an array of items from Python to a Cosmos DB Stored Procedure (v3 Python SDK or earlier).
import uuid
import os
# v3 Python SDK - note that v4 changed the API and stored procedures now live in the azure.cosmos.scripts module.
import azure.cosmos.documents as documents
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
COSMOS_HOST = os.environ['COSMOS_HOST']
MASTER_KEY = os.environ['MASTER_KEY']
DATABASE_ID = os.environ['DATABASE_ID']
COLLECTION_ID = os.environ['COLLECTION_ID']
database_link = 'dbs/' + DATABASE_ID
collection_link = database_link + '/colls/' + COLLECTION_ID
# Use sample bulk SP from here: https://github.com/Azure/azure-cosmosdb-js-server/blob/master/samples/stored-procedures/BulkImport.js
sproc_link = collection_link + '/sprocs/bulkImport'
def create_cosmos_entity(jobid, test):
return {
'JobID': jobid,
'Test': test
}
def main():
new_docs = []
counter = 0
while counter < 30:
new_docs.append(create_cosmos_entity(str(uuid.uuid4()), counter))
counter += 1
if(len(new_docs) > 0 and counter < 100):
client = cosmos_client.CosmosClient(COSMOS_HOST, {'masterKey': MASTER_KEY})
# The key here is to include [] around 'new_docs' otherwise call fails!
client.ExecuteStoredProcedure(sproc_link, [new_docs])
@rayriju
Copy link

rayriju commented Jul 19, 2019

if anyone can help, I'm new to cosmos and JS. I have a requirement where data to be upserted in bulk and I found JS for that,
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

function bulkImport(docs, upsert) {
var collection = getContext().getCollection();
var collectionLink = collection.getSelfLink();

// The count of imported docs, also used as current doc index.
var count = 0;
var errorCodes = { CONFLICT: 409 };

// Validate input.
if (!docs) throw new Error("The array is undefined or null.");

var docsLength = docs.length;
if (docsLength == 0) {
    getContext().getResponse().setBody(0);
     return;
}

// Call the create API to create a document.
tryCreate(docs[count], callback);

// Note that there are 2 exit conditions:
// 1) The createDocument request was not accepted.
// In this case the callback will not be called, we just call
// setBody and we are done.
// 2) The callback was called docs.length times.
// In this case all documents were created and we don’t need to call
// tryCreate anymore. Just call setBody and we are done.
function tryCreate(doc, callback) {
    var isAccepted = collection.createDocument(collectionLink, doc, { disableAutomaticIdGeneration : true}, callback);

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client,
    // which will call the script again with remaining set of docs.
    if (!isAccepted) getContext().getResponse().setBody(count); 
}
        
// To replace the document, first issue a query to find it and then call replace.
function tryReplace(doc, callback) {
    var parsedDoc = JSON.parse(doc);
    retrieveDoc(parsedDoc, null, function(retrievedDocs){
        var isAccepted = collection.replaceDocument(retrievedDocs[0]._self, parsedDoc, callback);
        if (!isAccepted) getContext().getResponse().setBody(count);
    });
}

function retrieveDoc(doc, continuation, callback) {
    var query = "select * from root r where r.id = '" + doc.id + "'";
    var requestOptions = { continuation : continuation }; 
    var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function(err, retrievedDocs, responseOptions) {
        if (err) throw err;
        
        if (retrievedDocs.length > 0) {
            callback(retrievedDocs);
        } else if (responseOptions.continuation) {
            retrieveDoc(doc, responseOptions.continuation, callback);            
        } else {
            throw "Error in retrieving document: " + doc.id;
        }
    });
    
    if (!isAccepted) getContext().getResponse().setBody(count); 
}

// This is called when collection.createDocument is done in order to
// process the result.
function callback(err, doc, options) {
    if (err) {
        // Replace the document if status code is 409 and upsert is enabled
        if(upsert && err.number == errorCodes.CONFLICT) {
            return tryReplace(docs[count], callback);
        } else {
            throw err;
        }
    }
    
       // One more document has been inserted, increment the count.
      count++;
      if (count >= docsLength) {
        // If we created all documents, we are done. Just set the response.
        getContext().getResponse().setBody(count); 
      } else {
        // Create next document.
        tryCreate(docs[count], callback);
      } 
} 

}

================================================================
trying to call it from python as ,
client.ExecuteStoredProcedure(sproc_linkOut, [new_docs,True], options=options) or
client.ExecuteStoredProcedure(sproc_linkOut, [new_docs], options=options)

also getting below error. But I think it should not arise as error 409 this code is written. Can you please help.
HTTPFailure: Status code: 400 Sub-status: 409
{"code":"BadRequest","message":"Message: {"Errors":["Encountered exception while executing function. Exception = Error: {\"Errors\":[\"Resource with specified id or name already exists.\"]}\r\nStack trace: Error: {\"Errors\":[\"Resource with specified id or name already exists.\"]}

@sjwaight
Copy link
Author

rayriju - suggest posting your question on Stack Overflow and tag it for Azure Cosmos DB.

@xmseraph
Copy link

xmseraph commented Jul 17, 2020

I got this error when i run it in my env.

AttributeError: 'CosmosClient' object has no attribute 'ExecuteStoredProcedure'

azure-core-1.1.1 azure-cosmos-4.0.0b6 are installed

@sjwaight
Copy link
Author

The v4 Cosmos DB Python SDK changed a bunch of APIs. Stored Procedures now live in the azure.cosmos.scripts module - see the docs for more information. https://azuresdkdocs.blob.core.windows.net/$web/python/azure-cosmos/4.0.0/azure.cosmos.html#azure.cosmos.scripts.ScriptsProxy.execute_stored_procedure

@rogernieto
Copy link

Hey @sjwaight, good post.
I'm trying to replicate because I need to do the same bulk Import.
I'm receiving the same error that @yanshen1982 used to have, but my partition key value is an attribute or the JSON.

What can be a good workaround for this?

Thanks!

@sjwaight
Copy link
Author

Recommend having a read through this Stack Overflow question and the accepted answer: https://stackoverflow.com/questions/48798523/azure-cosmos-db-asking-for-partition-key-for-stored-procedure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment