Skip to content

Instantly share code, notes, and snippets.

@lmazuel
Last active April 14, 2023 20:56
Show Gist options
  • Save lmazuel/8ca3a8462a3167c5e5587afac9ada155 to your computer and use it in GitHub Desktop.
Save lmazuel/8ca3a8462a3167c5e5587afac9ada155 to your computer and use it in GitHub Desktop.
HTTPX azure-core blog. Sample from blog post: http://link-when-public
from typing import Iterator, Optional, ContextManager
from azure.core.pipeline.transport import HttpResponse, HttpTransport, HttpRequest
import httpx
class HttpXTransportResponse(HttpResponse):
def __init__(self,
request: HttpRequest,
httpx_response: httpx.Response,
stream_contextmanager: Optional[ContextManager]=None,
):
super(HttpXTransportResponse, self).__init__(request, httpx_response)
self.status_code = httpx_response.status_code
self.headers = httpx_response.headers
self.reason = httpx_response.reason_phrase
self.content_type = httpx_response.headers.get('content-type')
self.stream_contextmanager = stream_contextmanager
def body(self):
return self.internal_response.content
def stream_download(self, _) -> Iterator[bytes]:
return HttpxStreamDownloadGenerator(_, self)
class HttpxStreamDownloadGenerator(object):
def __init__(self, _, response):
self.response = response
self.iter_bytes_func = self.response.internal_response.iter_bytes()
def __iter__(self):
return self
def __next__(self):
try:
return next(self.iter_bytes_func)
except StopIteration:
self.response.stream_contextmanager.__exit__()
raise
class HttpXTransport(HttpTransport):
def __init__(self):
self.client = None
def open(self):
self.client = httpx.Client()
def close(self):
self.client = None
def __enter__(self) -> "HttpXTransport":
self.open()
return self
def __exit__(self, *args):
self.close()
def send(self, request: HttpRequest, **kwargs) -> HttpResponse:
print(f"I was told to send a {request.method} request to {request.url}")
stream_response = kwargs.pop("stream", False)
parameters = {
"method": request.method,
"url": request.url,
"headers": request.headers.items(),
"data": request.data,
"files": request.files,
"allow_redirects": False,
**kwargs
}
stream_ctx = None # type: Optional[ContextManager]
if stream_response:
stream_ctx = self.client.stream(**parameters)
response = stream_ctx.__enter__()
else:
response = self.client.request(**parameters)
return HttpXTransportResponse(
request,
response,
stream_contextmanager=stream_ctx,
)
# Using it in a SDK
if __name__ == "__main__":
from azure.storage.blob import BlobClient
def raw_response_hook(pipeline_response):
print("Checking that I can ask the HTTPX response if I want to: ")
print(type(pipeline_response.http_response.internal_response))
blob_client = BlobClient(
'https://lmazuelblog.blob.core.windows.net/',
'demo',
'blog.txt',
transport=HttpXTransport()
)
with blob_client:
# A non-stream query
blob = blob_client.get_blob_properties(
raw_response_hook=raw_response_hook
)
print(f"The blog name is {blob.name}\n")
data = blob_client.download_blob(
raw_response_hook=raw_response_hook
)
print(f"The blob content is {data.content_as_text()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment