Skip to content

Instantly share code, notes, and snippets.

@lmazuel
Last active September 27, 2019 23:20
Show Gist options
  • Save lmazuel/85f408e485a114059daf1675136eaf5e to your computer and use it in GitHub Desktop.
Save lmazuel/85f408e485a114059daf1675136eaf5e to your computer and use it in GitHub Desktop.
async SansIO policies
class _SansIOAsyncHTTPPolicyRunner(AsyncHTTPPolicy[HTTPRequestType, AsyncHTTPResponseType]): #pylint: disable=unsubscriptable-object
"""Async implementation of the SansIO policy.
Modifies the request and sends to the next policy in the chain.
:param policy: A SansIO policy.
:type policy: ~azure.core.pipeline.policies.SansIOHTTPPolicy
"""
def __init__(self, policy: SansIOHTTPPolicy) -> None:
super(_SansIOAsyncHTTPPolicyRunner, self).__init__()
self._policy = policy
async def send(self, request: PipelineRequest):
"""Modifies the request and sends to the next policy in the chain.
:param request: The PipelineRequest object.
:type request: ~azure.core.pipeline.PipelineRequest
:return: The PipelineResponse object.
:rtype: ~azure.core.pipeline.PipelineResponse
"""
method_result = self._policy.on_request(request)
if hasattr(method_result, '__await__'):
await response
try:
response = await self.next.send(request) # type: ignore
except Exception: #pylint: disable=broad-except
method_result = self._policy.on_exception(request):
if hasattr(method_result, '__await__'):
method_result = await method_result
if not method_result:
raise
else:
method_result = self._policy.on_response(request, response)
if hasattr(method_result, '__await__'):
await method_result
return response
class AwaitableSansIOHTTPPolicy(Generic[HTTPRequestType, AsyncHTTPResponseType]):
"""Represents a sans I/O policy with awaitables members.
AwaitableSansIOHTTPPolicy is a base class for policies that only modify or
mutate a request based on the HTTP specification, and do no depend
on the specifics of any particular transport. AwaitableSansIOHTTPPolicy
are designed to work with an AsyncPipeline since we run on the same eventloop,
but do not share any IO with or specifics with the transport
The policy can act either before the request is done, or after.
"""
async def on_request(self, request):
# type: (PipelineRequest) -> None
"""Is executed before sending the request from next policy.
:param request: Request to be modified before sent from next policy.
:type request: ~azure.core.pipeline.PipelineRequest
"""
async def on_response(self, request, response):
# type: (PipelineRequest, PipelineResponse) -> None
"""Is executed after the request comes back from the policy.
:param request: Request to be modified after returning from the policy.
:type request: ~azure.core.pipeline.PipelineRequest
:param response: Pipeline response object
:type response: ~azure.core.pipeline.PipelineResponse
"""
#pylint: disable=no-self-use
async def on_exception(self, _request): #pylint: disable=unused-argument
# type: (PipelineRequest) -> bool
"""Is executed if an exception is raised while executing the next policy.
Developer can optionally implement this method to return True
if the exception has been handled and should not be forwarded to the caller.
This method is executed inside the exception handler.
:param request: The Pipeline request object
:type request: ~azure.core.pipeline.PipelineRequest
:return: False.
:rtype: bool
Example:
.. literalinclude:: ../examples/test_example_sansio.py
:start-after: [START on_exception]
:end-before: [END on_exception]
:language: python
:dedent: 4
"""
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment