Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from sys import Type
- from typing import List
- class Policy:
- ...
- class RetryPolicy(Policy):
- def __init__(self, max_retries=4, timeout=None):
- self.max_retries = max_retries
- self.timeout = timeout
- class LoggingPolicy(Policy):
- ...
- class TracingPolicy(Policy):
- ...
- class Pipeline:
- def __init__(self, policies: "List[Policy]"):
- self.policies = policies
- self.prepare_request_handlers = []
- self.sending_request_handlers = []
- def send(self, request):
- """ Send the given request through the pipeline"""
- # Each prepare_request_handler is called in the order they were registered
- for handler in self.prepare_request_handlers:
- request = handler(request)
- request = self.policies[0].send(request)
- # Each sending_request_handler is called in the order they were registered
- for handler in self.prepare_request_handlers:
- request = handler(request)
- # The last policy is the "transport" policy
- response = self.policies[-1].send(request)
- return response
- def add_prepare_request_handler(self, handler):
- self.prepare_request_handlers.add(handler)
- def get_policy(self, policy_type: "Type") -> "Policy":
- """ Get the policy of the given type
- Will return the policy if there is one and only one policy of the given type in the
- pipeline. Will throw otherwise.
- """
- matches = list([policy for policy in self.policies if isinstance(policy, policy_type)])
- if len(matches) != 1:
- raise ValueError('Found {} matches for policy type {}'.format(len(matches), policy_type.__name__))
- return matches[0]
- def set_policy(self, policy: "Policy", *, policy_type: "Type"=None):
- """ Set the policy of the given type to a new instance.
- """
- # Ensure that there is ona and only one policy of the type to replace/set
- policy_type_to_replace = policy_type or type(policy)
- self.get_policy(policy_type_to_replace)
- self.policies = list([policy if isinstance(existing_policy, policy_type_to_replace)
- else existing_policy
- for existing_policy in self.policies])
- # Usage example
- # Somehow the default pipeline is constructed for a given client
- pipeline = Pipeline([
- LoggingPolicy(),
- RetryPolicy(),
- TracingPolicy()
- ])
- retry_policy = pipeline.get_policy(RetryPolicy)
- retry_policy.max_retries = 1
Add Comment
Please, Sign In to add comment