Skip to content

Instantly share code, notes, and snippets.

@rndazurescript
Last active September 1, 2020 15:10
Show Gist options
  • Save rndazurescript/1d62c60ae270c6361d6e685ec0ef880d to your computer and use it in GitHub Desktop.
Save rndazurescript/1d62c60ae270c6361d6e685ec0ef880d to your computer and use it in GitHub Desktop.
Accessing storage account in parallel using managed identity
# The following sample shows how to run async authentication and access to storage
# account. This sample suffers from the IMDS limit of 5 concurrent requests but
# the python SDK has baked in retry policies
import asyncio
import logging
import os
from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob.aio import BlobServiceClient
import logging
# Based on https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py
# Configure target storage account in environment. Something like
# export SAMPLE_STORAGE_ACCOUNT_URL="https://myaccount.blob.core.windows.net/"
account_url = os.environ["SAMPLE_STORAGE_ACCOUNT_URL"]
# Enable debug logs to observe requests to IMDS
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class AuthenticationSampleAsync(object):
def __init__(self, counter, account_url):
self.counter = counter
self.account_url = account_url
async def authentication_with_azure_active_directory_async(self):
logging.info("authentication_with_azure_active_directory_async")
result = []
# Skipping environment credentials to speed up the process of using
# user defined managed identity
async with DefaultAzureCredential(exclude_environment_credential=True) as cred:
async with BlobServiceClient(self.account_url, credential=cred) as blob:
async for el in blob.list_containers():
result.append(el)
for c in result:
logging.info(f"{self.counter} --- {c.name}")
async def runAsync(counter, account_url):
runner = AuthenticationSampleAsync(counter, account_url)
await runner.authentication_with_azure_active_directory_async()
if __name__ == '__main__':
print("Press ctrl+c to exit")
for i in range(48):
asyncio.ensure_future(runAsync(i, account_url))
loop = asyncio.get_event_loop()
loop.run_forever()
# The following sample shows how to run async authentication and access to storage
# account. The following sample shows how to force the acquisition of a single access token
# which then gets reused by all concurrent threads thus not being affected by the IMDS limit
# of 5 concurrent requests
import asyncio
import os
from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob.aio import BlobServiceClient
import logging
# Based on https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py
# Configure target storage account in environment. Something like
# export SAMPLE_STORAGE_ACCOUNT_URL="https://myaccount.blob.core.windows.net/"
account_url = os.environ["SAMPLE_STORAGE_ACCOUNT_URL"]
# Enable debug logs to observe requests to IMDS
logging.basicConfig(filename='async_storage_share_token.log')
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class AuthenticationSampleAsync(object):
def __init__(self, counter, account_url, credential):
self.counter = counter
self.account_url = account_url
self.credential = credential
async def authentication_with_azure_active_directory_async(self):
logging.info("authentication_with_azure_active_directory_async")
result = []
async with BlobServiceClient(self.account_url, credential=self.credential) as client:
async for el in client.list_containers():
result.append(el)
for c in result:
logging.info(f"{self.counter} --- {c.name}")
async def runAsync(counter, account_url, credential):
runner = AuthenticationSampleAsync(counter, account_url, credential)
await runner.authentication_with_azure_active_directory_async()
async def initCredential(account_url, credential):
# Need to force the execution of a listing to cache the access token
async with BlobServiceClient(account_url, credential=credential) as client:
async for el in client.list_containers():
break
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Skipping environment credentials to speed up the process of using
# user defined managed identity
credential = DefaultAzureCredential(exclude_environment_credential=True)
asyncio.run(initCredential(account_url,credential))
print("Press ctrl+c to exit")
for i in range(48):
loop.create_task(runAsync(i, account_url, credential))
loop.run_forever()
# This code demonstrates the bypass_cache=true hidden parameter
# https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#error-and-debugging
import requests
import threading
import logging
import time
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def get_token(counter):
if (counter%2 ==0):
print (f"{counter} bypass")
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F&bypass_cache=true", headers={'Metadata': 'true'})
else:
print (f"{counter} no bypass")
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F", headers={'Metadata': 'true'})
class myThread (threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.result = ""
def run(self):
self.result = get_token(self.counter)
if (self.result.status_code == 200):
a = self.result.json()
print(f"{self.counter} --- OK {a['access_token']}")
else:
# print (f"{self.counter} --- FAILED")
time.sleep(2)
self.run()
[ myThread(i).start() for i in range(48)]
# Retry guidance
# https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#retry-guidance
# This code demonstrates the 5 requests per second limit documented in
# https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#error-and-debugging
import requests
import threading
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def get_token():
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F", headers={'Metadata': 'true'})
class myThread (threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.result = ""
def run(self):
self.result = get_token()
if (self.result.status_code == 200):
a = self.result.json()
print(f"{self.counter} --- OK")
else:
print (f"{self.counter} --- FAILED")
[ myThread(i).start() for i in range(48)]
# Retry guidance
# https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#retry-guidance
# This code demonstrates the 5 requests per second limit documented in
# https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#error-and-debugging
import requests
import threading
import logging
import time
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def get_token():
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F", headers={'Metadata': 'true'})
class myThread (threading.Thread):
def __init__(self, counter):
threading.Thread.__init__(self)
self.counter = counter
self.result = ""
def run(self):
self.result = get_token()
if (self.result.status_code == 200):
a = self.result.json()
print(f"{self.counter} --- OK")
# Note that even the delayed access_tokens are
# the same one, showing that IMDS is caching the tokens
print(f"{self.counter} : {a['access_token']}")
else:
print (f"{self.counter} --- FAILED")
time.sleep(2)
self.run()
[ myThread(i).start() for i in range(48)]
# Retry guidance
# https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#retry-guidance
# The following sample shows how to run multi threaded authorization and access to storage
# account. This sample suffers from the IMDS limit of 5 concurrent requests but
# the python SDK has baked in retry policies
import threading
import os
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
import logging
# Configure target storage account in environment. Something like
# export SAMPLE_STORAGE_ACCOUNT_URL="https://myaccount.blob.core.windows.net/"
account_url = os.environ["SAMPLE_STORAGE_ACCOUNT_URL"]
# Enable debug logs to observe requests to IMDS
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class myThread (threading.Thread):
def __init__(self, counter, account_url):
threading.Thread.__init__(self)
self.counter = counter
self.containers = []
self.credential = DefaultAzureCredential(exclude_environment_credential=True)
self.client = BlobServiceClient(account_url, credential=self.credential)
def list_containers(self):
return self.client.list_containers()
def run(self):
self.containers = self.list_containers()
for c in self.containers:
print(f"{self.counter} --- {c.name}")
[ myThread(i, account_url).start() for i in range(48)]
# The following sample shows how to run multi threaded authorization and access to storage
# account. The following sample shows how to force the acquisition of a single access token
# which then gets reused by all concurrent threads thus not being affected by the IMDS limit
# of 5 concurrent requests
import threading
import os
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
import logging
# Configure target storage account in environment. Something like
# export SAMPLE_STORAGE_ACCOUNT_URL="https://myaccount.blob.core.windows.net/"
account_url = os.environ["SAMPLE_STORAGE_ACCOUNT_URL"]
# Enable debug logs to observe requests to IMDS
logging.basicConfig(filename='threaded_storage_share_token.log')
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
credential = DefaultAzureCredential(exclude_environment_credential=True)
containers=BlobServiceClient(account_url, credential=credential).list_containers()
# Need to force the execution of a listing to cache the access token
for c in containers:
requests_log.info(f"Container:{c.name}")
class myThread (threading.Thread):
def __init__(self, counter, account_url, credential):
threading.Thread.__init__(self)
self.counter = counter
self.containers = []
self.client = BlobServiceClient(account_url, credential=credential)
def list_containers(self):
return self.client.list_containers()
def run(self):
self.containers = self.list_containers()
for c in self.containers:
print(f"{self.counter} --- {c.name}")
requests_log.info("Starting")
[ myThread(i, account_url, credential).start() for i in range(48)]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment