Last active
April 14, 2023 20:56
-
-
Save lmazuel/8ca3a8462a3167c5e5587afac9ada155 to your computer and use it in GitHub Desktop.
HTTPX azure-core blog. Sample from blog post: http://link-when-public
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Iterator, Optional, ContextManager | |
from azure.core.pipeline.transport import HttpResponse, HttpTransport, HttpRequest | |
import httpx | |
class HttpXTransportResponse(HttpResponse): | |
def __init__(self, | |
request: HttpRequest, | |
httpx_response: httpx.Response, | |
stream_contextmanager: Optional[ContextManager]=None, | |
): | |
super(HttpXTransportResponse, self).__init__(request, httpx_response) | |
self.status_code = httpx_response.status_code | |
self.headers = httpx_response.headers | |
self.reason = httpx_response.reason_phrase | |
self.content_type = httpx_response.headers.get('content-type') | |
self.stream_contextmanager = stream_contextmanager | |
def body(self): | |
return self.internal_response.content | |
def stream_download(self, _) -> Iterator[bytes]: | |
return HttpxStreamDownloadGenerator(_, self) | |
class HttpxStreamDownloadGenerator(object): | |
def __init__(self, _, response): | |
self.response = response | |
self.iter_bytes_func = self.response.internal_response.iter_bytes() | |
def __iter__(self): | |
return self | |
def __next__(self): | |
try: | |
return next(self.iter_bytes_func) | |
except StopIteration: | |
self.response.stream_contextmanager.__exit__() | |
raise | |
class HttpXTransport(HttpTransport): | |
def __init__(self): | |
self.client = None | |
def open(self): | |
self.client = httpx.Client() | |
def close(self): | |
self.client = None | |
def __enter__(self) -> "HttpXTransport": | |
self.open() | |
return self | |
def __exit__(self, *args): | |
self.close() | |
def send(self, request: HttpRequest, **kwargs) -> HttpResponse: | |
print(f"I was told to send a {request.method} request to {request.url}") | |
stream_response = kwargs.pop("stream", False) | |
parameters = { | |
"method": request.method, | |
"url": request.url, | |
"headers": request.headers.items(), | |
"data": request.data, | |
"files": request.files, | |
"allow_redirects": False, | |
**kwargs | |
} | |
stream_ctx = None # type: Optional[ContextManager] | |
if stream_response: | |
stream_ctx = self.client.stream(**parameters) | |
response = stream_ctx.__enter__() | |
else: | |
response = self.client.request(**parameters) | |
return HttpXTransportResponse( | |
request, | |
response, | |
stream_contextmanager=stream_ctx, | |
) | |
# Using it in a SDK | |
if __name__ == "__main__": | |
from azure.storage.blob import BlobClient | |
def raw_response_hook(pipeline_response): | |
print("Checking that I can ask the HTTPX response if I want to: ") | |
print(type(pipeline_response.http_response.internal_response)) | |
blob_client = BlobClient( | |
'https://lmazuelblog.blob.core.windows.net/', | |
'demo', | |
'blog.txt', | |
transport=HttpXTransport() | |
) | |
with blob_client: | |
# A non-stream query | |
blob = blob_client.get_blob_properties( | |
raw_response_hook=raw_response_hook | |
) | |
print(f"The blog name is {blob.name}\n") | |
data = blob_client.download_blob( | |
raw_response_hook=raw_response_hook | |
) | |
print(f"The blob content is {data.content_as_text()}") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment