Last active
October 23, 2015 13:01
-
-
Save osantana/a813416fc36b6517b05c to your computer and use it in GitHub Desktop.
Spike de Service Integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class Error(Exception): | |
| pass | |
| class RecoverableError(Error): | |
| pass | |
| class NonRecoverableError(Error): | |
| pass | |
| class Celery(object): | |
| def retry(self, task): | |
| task.retry() | |
| def get_retries(self, task): | |
| return task.request.retries | |
| class ServiceRegister(object): | |
| def __init__(self): | |
| self.register = {} | |
| def __call__(self, service): | |
| assert service.get_name() | |
| self.register[service.get_name()] = service | |
| def __getitem__(self, name): | |
| return self.register[name] | |
| service_register = ServiceRegister() | |
| class AutoRegister(type): | |
| pass # TODO: auto-register all Service subclasses | |
| class AbstractService(meta=AutoRegister): | |
| name = None | |
| sla = 100 | |
| max_retries = 10 | |
| backend = Celery() | |
| def create_client(self): | |
| raise NotImplementedError() | |
| @contextmanager | |
| def get_client(self, task): | |
| # I'm not sure but this error handling and lock should be | |
| # implemented in ClientProxy wrapper. Not here. | |
| with self.lock_aquire() as lock: | |
| try: | |
| proxy = ClientProxy(self) | |
| yield proxy | |
| except RecoverableError as ex: | |
| if self.get_retries(task) > self.get_max_retries(task): | |
| return self.max_retries_exceeded(proxy, task, ex) | |
| self.retry(proxy, task, ex) | |
| except NonRecoverableError as ex: | |
| self.error(proxy, task, ex) | |
| finally: | |
| lock.release() | |
| # overridables attributes | |
| def get_name(self): | |
| return self.name or self.__class__.__name__.lower().replace("service", "") | |
| def get_sla(self): | |
| return self.sla | |
| def get_backend(self): | |
| return self.backend | |
| def get_retries(self, task): | |
| return self.get_backend().get_retries(task) | |
| def get_max_retries(self): | |
| return self.max_retries | |
| # overridable methods | |
| def lock_aquire(self): | |
| return Lock.acquire() | |
| # handlers | |
| def max_retry_exceeded(self, proxy, task, exception): | |
| return | |
| def retry(self, proxy, task, exception): | |
| # TODO: fibonacci, etc... | |
| return self.get_backend().retry(task) | |
| def error(self, proxy, task, exception): | |
| return | |
| class ClientProxy(object): | |
| def __init__(self, service): | |
| self.service = service | |
| self.client = service.client() | |
| # TODO: auto-delegating proxy with error handling | |
| # Usage: | |
| class PartnerService(AbstractService): | |
| name = "partner" | |
| sla = 99.9 | |
| @app.task(bind=True) | |
| def taskfoo(task): | |
| with registry[name].get_client(task) as client: | |
| client.foo() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
O que gostaria de ter como Usage:
O que não sei ainda como passar é o retry_countdown atual para o task.retry() =/