Skip to content

Instantly share code, notes, and snippets.

@johanste
Created January 17, 2019 18:52
Show Gist options
  • Save johanste/efab270b259c134a1cad70ae4c1f206b to your computer and use it in GitHub Desktop.
Save johanste/efab270b259c134a1cad70ae4c1f206b to your computer and use it in GitHub Desktop.
Example semi-strongly typed pipeline
from sys import Type
from typing import List
class Policy:
...
class RetryPolicy(Policy):
def __init__(self, max_retries=4, timeout=None):
self.max_retries = max_retries
self.timeout = timeout
class LoggingPolicy(Policy):
...
class TracingPolicy(Policy):
...
class Pipeline:
def __init__(self, policies: "List[Policy]"):
self.policies = policies
self.prepare_request_handlers = []
self.sending_request_handlers = []
def send(self, request):
""" Send the given request through the pipeline"""
# Each prepare_request_handler is called in the order they were registered
for handler in self.prepare_request_handlers:
request = handler(request)
request = self.policies[0].send(request)
# Each sending_request_handler is called in the order they were registered
for handler in self.prepare_request_handlers:
request = handler(request)
# The last policy is the "transport" policy
response = self.policies[-1].send(request)
return response
def add_prepare_request_handler(self, handler):
self.prepare_request_handlers.add(handler)
def get_policy(self, policy_type: "Type") -> "Policy":
""" Get the policy of the given type
Will return the policy if there is one and only one policy of the given type in the
pipeline. Will throw otherwise.
"""
matches = list([policy for policy in self.policies if isinstance(policy, policy_type)])
if len(matches) != 1:
raise ValueError('Found {} matches for policy type {}'.format(len(matches), policy_type.__name__))
return matches[0]
def set_policy(self, policy: "Policy", *, policy_type: "Type"=None):
""" Set the policy of the given type to a new instance.
"""
# Ensure that there is ona and only one policy of the type to replace/set
policy_type_to_replace = policy_type or type(policy)
self.get_policy(policy_type_to_replace)
self.policies = list([policy if isinstance(existing_policy, policy_type_to_replace)
else existing_policy
for existing_policy in self.policies])
# Usage example
# Somehow the default pipeline is constructed for a given client
pipeline = Pipeline([
LoggingPolicy(),
RetryPolicy(),
TracingPolicy()
])
retry_policy = pipeline.get_policy(RetryPolicy)
retry_policy.max_retries = 1
@johanste
Copy link
Author

@lmazuel, I was playing around with some options on how to access parts of the pipeline without having to use indexes/reconstruct the whole thing in the 90+% case of just minor tweaks.

Thoughts?

@KrzysztofCwalina
Copy link

@johanste, how do you envision this being used in clients? i.e. does the client create pipelines or the users of the client create pipelines?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment