Skip to content

Instantly share code, notes, and snippets.

@osantana
Last active October 23, 2015 13:01
Show Gist options
  • Select an option

  • Save osantana/a813416fc36b6517b05c to your computer and use it in GitHub Desktop.

Select an option

Save osantana/a813416fc36b6517b05c to your computer and use it in GitHub Desktop.
Spike de Service Integration
class Error(Exception):
pass
class RecoverableError(Error):
pass
class NonRecoverableError(Error):
pass
class Celery(object):
def retry(self, task):
task.retry()
def get_retries(self, task):
return task.request.retries
class ServiceRegister(object):
def __init__(self):
self.register = {}
def __call__(self, service):
assert service.get_name()
self.register[service.get_name()] = service
def __getitem__(self, name):
return self.register[name]
service_register = ServiceRegister()
class AutoRegister(type):
pass # TODO: auto-register all Service subclasses
class AbstractService(meta=AutoRegister):
name = None
sla = 100
max_retries = 10
backend = Celery()
def create_client(self):
raise NotImplementedError()
@contextmanager
def get_client(self, task):
# I'm not sure but this error handling and lock should be
# implemented in ClientProxy wrapper. Not here.
with self.lock_aquire() as lock:
try:
proxy = ClientProxy(self)
yield proxy
except RecoverableError as ex:
if self.get_retries(task) > self.get_max_retries(task):
return self.max_retries_exceeded(proxy, task, ex)
self.retry(proxy, task, ex)
except NonRecoverableError as ex:
self.error(proxy, task, ex)
finally:
lock.release()
# overridables attributes
def get_name(self):
return self.name or self.__class__.__name__.lower().replace("service", "")
def get_sla(self):
return self.sla
def get_backend(self):
return self.backend
def get_retries(self, task):
return self.get_backend().get_retries(task)
def get_max_retries(self):
return self.max_retries
# overridable methods
def lock_aquire(self):
return Lock.acquire()
# handlers
def max_retry_exceeded(self, proxy, task, exception):
return
def retry(self, proxy, task, exception):
# TODO: fibonacci, etc...
return self.get_backend().retry(task)
def error(self, proxy, task, exception):
return
class ClientProxy(object):
def __init__(self, service):
self.service = service
self.client = service.client()
# TODO: auto-delegating proxy with error handling
# Usage:
class PartnerService(AbstractService):
name = "partner"
sla = 99.9
@app.task(bind=True)
def taskfoo(task):
with registry[name].get_client(task) as client:
client.foo()
@Fedalto
Copy link

Fedalto commented Oct 22, 2015

O que gostaria de ter como Usage:

class MyTask(AbstractTask):
    @self.lock
    @self.retry
    def run(self, *args, **kwargs):
        with registry[name].get_client(task) as client:
            client.foo()

O que não sei ainda como passar é o retry_countdown atual para o task.retry() =/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment