This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fspath(path): | |
'''https://www.python.org/dev/peps/pep-0519/#os''' | |
if isinstance(path, (str, bytes)): | |
return path | |
# Work from the object's type to match method resolution of other magic | |
# methods. | |
path_type = type(path) | |
try: | |
path = path_type.__fspath__(path) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
#------------------------------------------------------------------------- | |
# Copyright (c) Microsoft Corporation. All rights reserved. | |
# Licensed under the MIT License. See License.txt in the project root for | |
# license information. | |
#-------------------------------------------------------------------------- | |
from setuptools import setup | |
from distutils import log as logger |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
while True: | |
pipeline_runs = list(self._adf_client.pipeline_runs.list_by_factory(self.group_name, df_name)) | |
if pipeline_runs: | |
break | |
time.sleep(15) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def upload_template(client, resource_group, json_template): | |
api_version = extract_from_template(json_template) | |
new_batch_client = BatchClient(client.config.credentials, client.config.subscription_id, api_version) | |
models = new_batch_client.models(api_version) | |
built_model = build_template_model(models, json_template) | |
return new_batch_client.template.upload(resource_group, built_model).result() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
context = adal.AuthenticationContext(authority_url, api_version=None) | |
code = context.acquire_user_code(RESOURCE, clientid) | |
print(code['message']) | |
credentials = AdalAuthentication( | |
context.acquire_token_with_device_code, | |
resource, | |
code, | |
clientid | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def wrapper(url, awaitable=False): | |
"""This is wrapper, that return the result or a awaitable future to get the result. | |
This wrapper does not involve asyncio and can parsed by Python 2.7.""" | |
if awaitable: | |
from async_version import foo | |
return foo(url) | |
else: | |
from sync_version import foo | |
return foo(url) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AsyncMixin: | |
async def async_get(self, url): | |
return "Async download: "+url |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async def mymethod(): | |
account_name = self.get_resource_name('pyarmstorage18') | |
# Normal call, async is just HTTP I/O | |
result_check = await self.storage_client.storage_accounts.check_name_availability( | |
account_name | |
) | |
self.assertTrue(result_check.name_available) | |
params_create = models.StorageAccountCreateParameters( |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This starts a poller, and you can call "result" on the poller to get the resource | |
poller = client.storage_accounts.create(parameters) | |
assert isinstance(poller, AzureOperationPoller) | |
account = poller.result() | |
assert isinstance(account, StorageAccount) | |
# This starts a poller, and you can call "result" on the poller to get the ClientRawResponse | |
# THIS IS A BREAKING CHANGE, but diferentiate the resource envelope (raw or not) from the polling | |
# This makes the return type consistent: whatever the value of "raw", return type is an AzureOperationPoller | |
poller = client.storage_accounts.create(parameters, raw=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import requests | |
# These two lines enable debugging at httplib level (requests->urllib3->http.client) | |
# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA. | |
# The only thing missing will be the response.body which is not logged. | |
try: | |
import http.client as http_client | |
except ImportError: | |
# Python 2 |
OlderNewer