Created
February 11, 2023 05:27
-
-
Save csm10495/65ca17fdb758ecec69c2bb8532a78b3b to your computer and use it in GitHub Desktop.
cosmos2json.py - A quick and dirty script to run queries against a cosmos db's container
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Simple script to do a simple query against a cosmosdb in azure | |
Can use it to dump a db to json.. if ya need that for some reason. | |
MIT License - Charles Machalow | |
# pip install azure-cosmos | |
# pip install azure-identity | |
''' | |
import argparse | |
import io | |
import json | |
import sys | |
from azure.cosmos import CosmosClient | |
class OutputJSONStreamer: | |
""" | |
A hacky way of streaming the output of a query in a json-y format | |
""" | |
def __init__(self, output: io.StringIO): | |
self._initial_items = [] | |
self.output = output | |
def __enter__(self): | |
return self | |
def __exit__(self, *args, **kargs): | |
if len(self._initial_items) > 2: | |
self.output.write("\n]") | |
elif len(self._initial_items) == 1: | |
self.output.write(json.dumps(self._initial_items[0], indent=4)) | |
elif len(self._initial_items) == 0: | |
self.output.write('{}') | |
else: | |
self.output.write(json.dumps(self._initial_items, indent=4)) | |
def process(self, item): | |
if len(self._initial_items) < 2: | |
self._initial_items.append(item) | |
if len(self._initial_items) == 2: | |
self.output.write("[\n") | |
self.output.write(json.dumps(self._initial_items[0], indent=4)) | |
self.output.write(",\n") | |
self.output.write(json.dumps(self._initial_items[1], indent=4)) | |
self._initial_items.append(None) | |
return | |
if len(self._initial_items) > 2: | |
self.output.write(",\n") | |
self.output.write(json.dumps(item, indent=4)) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--endpoint", required=True, help="Cosmos DB endpoint") | |
parser.add_argument("--key",required=True, help="Cosmos DB key") | |
parser.add_argument("--database", required=True,help="Cosmos database") | |
parser.add_argument("--container",required=True, help="Cosmos container") | |
parser.add_argument("--query",required=True, help="query to execute.. for example: 'select * from c'") | |
parser.add_argument("-o", "--output",required=False, default=None, help="If given a path to send this output to (otherwise goes to stdout)") | |
args = parser.parse_args() | |
with CosmosClient(url=args.endpoint, credential=args.key) as cosmos: | |
db = cosmos.get_database_client(args.database) | |
container = db.get_container_client(args.container) | |
info = container.query_items(args.query, enable_cross_partition_query=True) | |
output = open(args.output, 'w') if args.output else sys.stdout | |
try: | |
with OutputJSONStreamer(output) as streamer: | |
for q in info: | |
streamer.process(q) | |
finally: | |
if args.output: | |
output.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment