Last active
September 1, 2020 15:10
-
-
Save rndazurescript/1d62c60ae270c6361d6e685ec0ef880d to your computer and use it in GitHub Desktop.
Accessing storage account in parallel using managed identity
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The following sample shows how to run async authentication and access to storage | |
# account. This sample suffers from the IMDS limit of 5 concurrent requests but | |
# the python SDK has baked in retry policies | |
import asyncio | |
import logging | |
import os | |
from azure.identity.aio import DefaultAzureCredential | |
from azure.storage.blob.aio import BlobServiceClient | |
import logging | |
# Based on https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/textanalytics/azure-ai-textanalytics/samples/async_samples/sample_authentication_async.py | |
# Configure target storage account in environment. Something like | |
# export SAMPLE_STORAGE_ACCOUNT_URL="https://myaccount.blob.core.windows.net/" | |
account_url = os.environ["SAMPLE_STORAGE_ACCOUNT_URL"] | |
# Enable debug logs to observe requests to IMDS | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
class AuthenticationSampleAsync(object): | |
def __init__(self, counter, account_url): | |
self.counter = counter | |
self.account_url = account_url | |
async def authentication_with_azure_active_directory_async(self): | |
logging.info("authentication_with_azure_active_directory_async") | |
result = [] | |
# Skipping environment credentials to speed up the process of using | |
# user defined managed identity | |
async with DefaultAzureCredential(exclude_environment_credential=True) as cred: | |
async with BlobServiceClient(self.account_url, credential=cred) as blob: | |
async for el in blob.list_containers(): | |
result.append(el) | |
for c in result: | |
logging.info(f"{self.counter} --- {c.name}") | |
async def runAsync(counter, account_url): | |
runner = AuthenticationSampleAsync(counter, account_url) | |
await runner.authentication_with_azure_active_directory_async() | |
if __name__ == '__main__': | |
print("Press ctrl+c to exit") | |
for i in range(48): | |
asyncio.ensure_future(runAsync(i, account_url)) | |
loop = asyncio.get_event_loop() | |
loop.run_forever() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code demonstrates the bypass_cache=true hidden parameter | |
# https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#error-and-debugging | |
import requests | |
import threading | |
import logging | |
import time | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
def get_token(counter): | |
if (counter%2 ==0): | |
print (f"{counter} bypass") | |
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F&bypass_cache=true", headers={'Metadata': 'true'}) | |
else: | |
print (f"{counter} no bypass") | |
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F", headers={'Metadata': 'true'}) | |
class myThread (threading.Thread): | |
def __init__(self, counter): | |
threading.Thread.__init__(self) | |
self.counter = counter | |
self.result = "" | |
def run(self): | |
self.result = get_token(self.counter) | |
if (self.result.status_code == 200): | |
a = self.result.json() | |
print(f"{self.counter} --- OK {a['access_token']}") | |
else: | |
# print (f"{self.counter} --- FAILED") | |
time.sleep(2) | |
self.run() | |
[ myThread(i).start() for i in range(48)] | |
# Retry guidance | |
# https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#retry-guidance | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code demonstrates the 5 requests per second limit documented in | |
# https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#error-and-debugging | |
import requests | |
import threading | |
import logging | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
def get_token(): | |
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F", headers={'Metadata': 'true'}) | |
class myThread (threading.Thread): | |
def __init__(self, counter): | |
threading.Thread.__init__(self) | |
self.counter = counter | |
self.result = "" | |
def run(self): | |
self.result = get_token() | |
if (self.result.status_code == 200): | |
a = self.result.json() | |
print(f"{self.counter} --- OK") | |
else: | |
print (f"{self.counter} --- FAILED") | |
[ myThread(i).start() for i in range(48)] | |
# Retry guidance | |
# https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#retry-guidance | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code demonstrates the 5 requests per second limit documented in | |
# https://docs.microsoft.com/en-us/azure/virtual-machines/linux/instance-metadata-service#error-and-debugging | |
import requests | |
import threading | |
import logging | |
import time | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
def get_token(): | |
return requests.get(url = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fstorage.azure.com%2F", headers={'Metadata': 'true'}) | |
class myThread (threading.Thread): | |
def __init__(self, counter): | |
threading.Thread.__init__(self) | |
self.counter = counter | |
self.result = "" | |
def run(self): | |
self.result = get_token() | |
if (self.result.status_code == 200): | |
a = self.result.json() | |
print(f"{self.counter} --- OK") | |
# Note that even the delayed access_tokens are | |
# the same one, showing that IMDS is caching the tokens | |
print(f"{self.counter} : {a['access_token']}") | |
else: | |
print (f"{self.counter} --- FAILED") | |
time.sleep(2) | |
self.run() | |
[ myThread(i).start() for i in range(48)] | |
# Retry guidance | |
# https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#retry-guidance | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The following sample shows how to run multi threaded authorization and access to storage | |
# account. This sample suffers from the IMDS limit of 5 concurrent requests but | |
# the python SDK has baked in retry policies | |
import threading | |
import os | |
from azure.identity import DefaultAzureCredential | |
from azure.storage.blob import BlobServiceClient | |
import logging | |
# Configure target storage account in environment. Something like | |
# export SAMPLE_STORAGE_ACCOUNT_URL="https://myaccount.blob.core.windows.net/" | |
account_url = os.environ["SAMPLE_STORAGE_ACCOUNT_URL"] | |
# Enable debug logs to observe requests to IMDS | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
class myThread (threading.Thread): | |
def __init__(self, counter, account_url): | |
threading.Thread.__init__(self) | |
self.counter = counter | |
self.containers = [] | |
self.credential = DefaultAzureCredential(exclude_environment_credential=True) | |
self.client = BlobServiceClient(account_url, credential=self.credential) | |
def list_containers(self): | |
return self.client.list_containers() | |
def run(self): | |
self.containers = self.list_containers() | |
for c in self.containers: | |
print(f"{self.counter} --- {c.name}") | |
[ myThread(i, account_url).start() for i in range(48)] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment