Skip to content

Instantly share code, notes, and snippets.

View lmazuel's full-sized avatar

Laurent Mazuel lmazuel

View GitHub Profile
@lmazuel
lmazuel / vm_test_advanced.py
Last active October 23, 2020 16:32
VM PATCH behavior
import logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s:%(levelname)s:%(name)s:%(message)s"
)
from devtools_testutils import mgmt_settings_real
sub_id = mgmt_settings_real.SUBSCRIPTION_ID
cred = mgmt_settings_real.get_azure_core_credentials()
@lmazuel
lmazuel / deseralizer.py
Last active August 14, 2020 23:29
SR brainstorm
with ServiceBusClient.from_connection_string(connstr) as client:
# max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
# Default is None; to receive forever.
with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
for msg in receiver: # ServiceBusReceiver instance is a generator
bytes_data = msg.body
sr_client = SRClientCache(credentials, endpoint)
schema_type: str = SRClientCache.detect_type(bytes_data)
@lmazuel
lmazuel / blog.py
Last active April 14, 2023 20:56
HTTPX azure-core blog. Sample from blog post: http://link-when-public
from typing import Iterator, Optional, ContextManager
from azure.core.pipeline.transport import HttpResponse, HttpTransport, HttpRequest
import httpx
class HttpXTransportResponse(HttpResponse):
def __init__(self,
request: HttpRequest,
httpx_response: httpx.Response,
# Removes RG that start with that prefix
import os
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
RG_PREFIX = "pycog"
def get_credentials():
@lmazuel
lmazuel / cred_wrapper.py
Last active June 5, 2024 15:53
Use azure-identity to authenticate mgmt SDKs that needs "msrestazure/azure.common.credentials" protocol
# Wrap credentials from azure-identity to be compatible with SDK that needs msrestazure or azure.common.credentials
# Need msrest >= 0.6.0
# See also https://pypi.org/project/azure-identity/
from msrest.authentication import BasicTokenAuthentication
from azure.core.pipeline.policies import BearerTokenCredentialPolicy
from azure.core.pipeline import PipelineRequest, PipelineContext
from azure.core.pipeline.transport import HttpRequest
@lmazuel
lmazuel / generate_all_swagger.py
Created March 27, 2020 23:03
SwaggerToSdk generate them all
SWAGGERS = r"""
specification\addons\resource-manager\readme.md
specification\adhybridhealthservice\resource-manager\readme.md
specification\advisor\resource-manager\readme.md
specification\alertsmanagement\resource-manager\readme.md
specification\analysisservices\resource-manager\readme.md
specification\apimanagement\control-plane\readme.md
specification\apimanagement\resource-manager\readme.md
specification\appconfiguration\data-plane\readme.md
specification\appconfiguration\resource-manager\readme.md
@lmazuel
lmazuel / mgmt.py
Created January 30, 2020 22:36
azure-identity and new autorest
from azure.identity import ClientSecretCredential
from azure.mgmt.storage import StorageManagementClient
def get_credentials_identity():
subscription_id = os.environ.get(
'AZURE_SUBSCRIPTION_ID',
'11111111-1111-1111-1111-111111111111') # your Azure Subscription Id
credentials = ClientSecretCredential(
client_id=os.environ['AZURE_CLIENT_ID'],
@lmazuel
lmazuel / flask.py
Last active October 1, 2019 17:06
Etag
from flask import Flask, request
from flask_restful import Resource, Api
from azure.appconfiguration import AzureAppConfigurationClient
app = Flask(__name__)
api = Api(app)
class Config(Resource):
def __init__(self):
@lmazuel
lmazuel / awaitable_return_type.py
Last active September 27, 2019 23:20
async SansIO policies
class _SansIOAsyncHTTPPolicyRunner(AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): #pylint: disable=unsubscriptable-object
"""Async implementation of the SansIO policy.
Modifies the request and sends to the next policy in the chain.
:param policy: A SansIO policy.
:type policy: ~azure.core.pipeline.policies.SansIOHTTPPolicy
"""
def __init__(self, policy: SansIOHTTPPolicy) -> None:
@lmazuel
lmazuel / met.py
Created June 3, 2019 21:21
Metaclass
from functools import wraps
from six import add_metaclass
class MetaCount(type):
def __new__(cls, *args, **kwargs):
obj = super(MetaCount, cls).__new__(cls, *args, **kwargs)
obj.read_count = 0
if hasattr(obj, 'read'):
original_read = obj.read
@wraps(original_read)