Created
January 17, 2019 18:52
-
-
Save johanste/efab270b259c134a1cad70ae4c1f206b to your computer and use it in GitHub Desktop.
Example semi-strongly typed pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sys import Type | |
from typing import List | |
class Policy: | |
... | |
class RetryPolicy(Policy): | |
def __init__(self, max_retries=4, timeout=None): | |
self.max_retries = max_retries | |
self.timeout = timeout | |
class LoggingPolicy(Policy): | |
... | |
class TracingPolicy(Policy): | |
... | |
class Pipeline: | |
def __init__(self, policies: "List[Policy]"): | |
self.policies = policies | |
self.prepare_request_handlers = [] | |
self.sending_request_handlers = [] | |
def send(self, request): | |
""" Send the given request through the pipeline""" | |
# Each prepare_request_handler is called in the order they were registered | |
for handler in self.prepare_request_handlers: | |
request = handler(request) | |
request = self.policies[0].send(request) | |
# Each sending_request_handler is called in the order they were registered | |
for handler in self.prepare_request_handlers: | |
request = handler(request) | |
# The last policy is the "transport" policy | |
response = self.policies[-1].send(request) | |
return response | |
def add_prepare_request_handler(self, handler): | |
self.prepare_request_handlers.add(handler) | |
def get_policy(self, policy_type: "Type") -> "Policy": | |
""" Get the policy of the given type | |
Will return the policy if there is one and only one policy of the given type in the | |
pipeline. Will throw otherwise. | |
""" | |
matches = list([policy for policy in self.policies if isinstance(policy, policy_type)]) | |
if len(matches) != 1: | |
raise ValueError('Found {} matches for policy type {}'.format(len(matches), policy_type.__name__)) | |
return matches[0] | |
def set_policy(self, policy: "Policy", *, policy_type: "Type"=None): | |
""" Set the policy of the given type to a new instance. | |
""" | |
# Ensure that there is ona and only one policy of the type to replace/set | |
policy_type_to_replace = policy_type or type(policy) | |
self.get_policy(policy_type_to_replace) | |
self.policies = list([policy if isinstance(existing_policy, policy_type_to_replace) | |
else existing_policy | |
for existing_policy in self.policies]) | |
# Usage example | |
# Somehow the default pipeline is constructed for a given client | |
pipeline = Pipeline([ | |
LoggingPolicy(), | |
RetryPolicy(), | |
TracingPolicy() | |
]) | |
retry_policy = pipeline.get_policy(RetryPolicy) | |
retry_policy.max_retries = 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@johanste, how do you envision this being used in clients? i.e. does the client create pipelines or the users of the client create pipelines?