This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s" | |
) | |
from devtools_testutils import mgmt_settings_real | |
sub_id = mgmt_settings_real.SUBSCRIPTION_ID | |
cred = mgmt_settings_real.get_azure_core_credentials() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with ServiceBusClient.from_connection_string(connstr) as client: | |
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt. | |
# Default is None; to receive forever. | |
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver: | |
for msg in receiver: # ServiceBusReceiver instance is a generator | |
bytes_data = msg.body | |
sr_client = SRClientCache(credentials, endpoint) | |
schema_type: str = SRClientCache.detect_type(bytes_data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterator, Optional, ContextManager | |
from azure.core.pipeline.transport import HttpResponse, HttpTransport, HttpRequest | |
import httpx | |
class HttpXTransportResponse(HttpResponse): | |
def __init__(self, | |
request: HttpRequest, | |
httpx_response: httpx.Response, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Removes RG that start with that prefix | |
import os | |
from azure.common.credentials import ServicePrincipalCredentials | |
from azure.mgmt.resource import ResourceManagementClient | |
RG_PREFIX = "pycog" | |
def get_credentials(): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Wrap credentials from azure-identity to be compatible with SDK that needs msrestazure or azure.common.credentials | |
# Need msrest >= 0.6.0 | |
# See also https://pypi.org/project/azure-identity/ | |
from msrest.authentication import BasicTokenAuthentication | |
from azure.core.pipeline.policies import BearerTokenCredentialPolicy | |
from azure.core.pipeline import PipelineRequest, PipelineContext | |
from azure.core.pipeline.transport import HttpRequest |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SWAGGERS = r""" | |
specification\addons\resource-manager\readme.md | |
specification\adhybridhealthservice\resource-manager\readme.md | |
specification\advisor\resource-manager\readme.md | |
specification\alertsmanagement\resource-manager\readme.md | |
specification\analysisservices\resource-manager\readme.md | |
specification\apimanagement\control-plane\readme.md | |
specification\apimanagement\resource-manager\readme.md | |
specification\appconfiguration\data-plane\readme.md | |
specification\appconfiguration\resource-manager\readme.md |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from azure.identity import ClientSecretCredential | |
from azure.mgmt.storage import StorageManagementClient | |
def get_credentials_identity(): | |
subscription_id = os.environ.get( | |
'AZURE_SUBSCRIPTION_ID', | |
'11111111-1111-1111-1111-111111111111') # your Azure Subscription Id | |
credentials = ClientSecretCredential( | |
client_id=os.environ['AZURE_CLIENT_ID'], |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, request | |
from flask_restful import Resource, Api | |
from azure.appconfiguration import AzureAppConfigurationClient | |
app = Flask(__name__) | |
api = Api(app) | |
class Config(Resource): | |
def __init__(self): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class _SansIOAsyncHTTPPolicyRunner(AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): #pylint: disable=unsubscriptable-object | |
"""Async implementation of the SansIO policy. | |
Modifies the request and sends to the next policy in the chain. | |
:param policy: A SansIO policy. | |
:type policy: ~azure.core.pipeline.policies.SansIOHTTPPolicy | |
""" | |
def __init__(self, policy: SansIOHTTPPolicy) -> None: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
from six import add_metaclass | |
class MetaCount(type): | |
def __new__(cls, *args, **kwargs): | |
obj = super(MetaCount, cls).__new__(cls, *args, **kwargs) | |
obj.read_count = 0 | |
if hasattr(obj, 'read'): | |
original_read = obj.read | |
@wraps(original_read) |