Last active
December 22, 2023 13:22
-
-
Save jsmitka/6e2326d5f0443f3a060a6f73eae6f34e to your computer and use it in GitHub Desktop.
Actor interfaces for Dramatiq
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from abc import ABCMeta | |
from dramatiq import actor, Message, get_broker | |
def implementation_do_call(self, *args, **kwargs): | |
return self.perform(*args, **kwargs) | |
class BaseActorInterfaceMeta(ABCMeta): | |
DEFAULT_QUEUE_NAME = 'default' | |
def __new__(mcs, name, bases, namespace, **kwargs): | |
cls = super().__new__(mcs, name, bases, namespace) | |
# If the class has the perform() method, it is either an Actor Interface or an Actor Implementation. | |
if hasattr(cls, 'perform'): | |
perform_method = getattr(cls, 'perform') | |
# Actor Interface has an abstract method: | |
if hasattr(perform_method, '__isabstractmethod__') and perform_method.__isabstractmethod__ is True: | |
# Set the name of the actor and options passed as kwargs as | |
cls.actor_name = kwargs.pop('actor_name', cls.__name__) | |
cls.queue_name = kwargs.get('queue_name', mcs.DEFAULT_QUEUE_NAME) | |
cls.actor_options = kwargs | |
else: | |
cls_instance = cls() | |
actor_instance = actor(cls_instance, actor_name=cls.actor_name, **cls.actor_options) | |
setattr(cls, '__call__', implementation_do_call) | |
setattr(cls, '__actor__', actor_instance) | |
return cls_instance | |
return cls | |
class BaseActorInterface(metaclass=BaseActorInterfaceMeta): | |
@classmethod | |
def send(cls, *args, **kwargs): | |
message = cls.create_message(args, kwargs) | |
get_broker().enqueue(message) | |
@classmethod | |
def create_message(cls, args, kwargs, options=None): | |
return Message( | |
queue_name=cls.queue_name, | |
actor_name=cls.actor_name, | |
args=args, | |
kwargs=kwargs, | |
options=options if options is not None else {} | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from shared_library import TaskOneInterface, TaskTwoInterface | |
# Call TaskOne: | |
TaskOneInterface.send(10, 5) | |
# Call TaskTwo: | |
TaskTwoInterface.send('hello!') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from shared_library import TaskOneInterface | |
class TaskOne(TaskOneInterface): | |
def perform(self, a: int, b: int): | |
print(f'{a} + {b} = {a + b}') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from shared_library import TaskTwoInterface | |
class TaskTwo(TaskTwoInterface): | |
def perform(self, val): | |
print(f'Task 2 value: {val}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment