-
-
Save Corwinpro/456535c79071fa71c2b5d7a1896f38f6 to your computer and use it in GitHub Desktop.
Implementation of the actor model in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import asyncio | |
class MessageBox: | |
""" A class that can | |
- send messages of a certain type with some context (== put a message into | |
an internal queue), | |
- receive messages from the internal queue | |
""" | |
def __init__(self): | |
self._queue = asyncio.Queue() | |
async def send(self, message_type, *content): | |
await self._queue.put((message_type, content)) | |
async def receive(self): | |
return await self._queue.get() | |
class Actor(abc.ABC): | |
""" Abstract base class for actors with custom logic. | |
Attributes | |
---------- | |
message_box: a queue into which messages can be put, and read from. | |
_actor_tasks: a list of references to the children's _run tasks | |
""" | |
# Messages | |
ERROR = "error" | |
@abc.abstractmethod | |
async def run(self): | |
pass | |
def __init__(self, message_box=None, parent_message_box=None): | |
self.message_box = message_box or MessageBox() | |
self._parent_message_box = parent_message_box | |
self._actor_tasks = [] | |
async def _run(self): | |
""" Execute the ``run`` method in a safe way, such that errors are | |
communicated to the parent message box, and all pending tasks are | |
cancelled in the end. | |
""" | |
result = None | |
try: | |
result = await self.run() | |
except asyncio.CancelledError: | |
pass | |
except Exception as e: | |
await self._parent_message_box.send(Actor.ERROR, e) | |
await self._cancel_tasks() | |
return result | |
async def _cancel_tasks(self): | |
for task in self._actor_tasks: | |
task.cancel() | |
try: | |
await asyncio.gather(*self._actor_tasks) | |
except asyncio.CancelledError: | |
pass | |
def spawn(self, actor_class, *args, **kwargs): | |
""" Create a new actor of type ``actor_class``, and run the actor with | |
the provided args/kwargs. Also, keep the reference to the created "run" | |
task of the new actor in ``self._actor_tasks``. | |
""" | |
actor = self._actor_factory(actor_class) | |
task = asyncio.create_task(actor._run(*args, **kwargs)) | |
self._actor_tasks.append(task) | |
return actor.message_box | |
def _actor_factory(self, actor_class): | |
""" Create an object of type ``actor_class`` with a new ``message_box`` | |
and a ``parent_message_box`` that is the self's message box. | |
""" | |
message_box = MessageBox() | |
actor = actor_class( | |
message_box=message_box, parent_message_box=self.message_box | |
) | |
return actor | |
class RootActor(Actor): | |
""" An actor that waits for its children and returns the result. | |
""" | |
async def run(self): | |
return await asyncio.gather(*self._actor_tasks) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from actors import Actor, RootActor | |
class Counter(Actor): | |
# Messages | |
INCREMENT = "increment" | |
DISPLAY = "display" | |
SHUTDOWN = "shutdown" | |
async def run(self): | |
print("starting") | |
count = 0 | |
while True: | |
print(f"{id(self)} waiting for message") | |
message_type, *content = await self.message_box.receive() | |
print(f"{id(self)} got message", message_type) | |
await asyncio.sleep(0.5) | |
if message_type == Counter.INCREMENT: | |
count += 1 | |
elif message_type == Counter.DISPLAY: | |
print(f"Counter {id(self)}: {count}") | |
elif message_type == Counter.SHUTDOWN: | |
break | |
async def main(): | |
root = RootActor() | |
counter_pid1 = root.spawn(Counter) | |
counter_pid2 = root.spawn(Counter) | |
counter_pid3 = root.spawn(Counter) | |
print("spawned") | |
for _ in range(3): | |
await counter_pid1.send(Counter.INCREMENT) | |
await counter_pid2.send(Counter.INCREMENT) | |
await counter_pid3.send(Counter.INCREMENT) | |
print("done sending") | |
await counter_pid1.send(Counter.DISPLAY) | |
await counter_pid2.send(Counter.DISPLAY) | |
await counter_pid3.send(Counter.DISPLAY) | |
await counter_pid1.send(Counter.SHUTDOWN) | |
await counter_pid2.send(Counter.SHUTDOWN) | |
await counter_pid3.send(Counter.SHUTDOWN) | |
await root.run() | |
print("done running") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from actors import Actor, RootActor | |
class Processor(Actor): | |
""" An Actor that awaits for a (function, function argument, message box), | |
awaits for the function to execute with the argument, and sends the RECEIVE | |
signal with the result to the message box. Raises If the argument is 0. | |
The ``raise`` branch demostrantes what happens if a child actor raises. | |
""" | |
# Messages | |
PROCESS = "process" | |
async def run(self): | |
message_type, content = await self.message_box.receive() | |
if message_type == Processor.PROCESS: | |
fun, argument, message_box = content | |
if argument == 0: | |
raise RuntimeError(argument) | |
result = await fun(argument) | |
await message_box.send(Mapper.RECEIVE, result) | |
class Mapper(Actor): | |
""" An Actor that can handle 3 types of messages: MAP, RECEIVE, ERROR. | |
For a MAP message, the content is expected to be a function and a | |
iterable of arguments. For each item in arguments, spawn a ``Processor`` | |
actor that will apply the function to the item. | |
For a RECEIVE message, which is expected to be sent by a ``Processor``, | |
store the received data, and if all spawned ``Processor`` s have finished, | |
break. | |
For an ERROR message, consider a ``Processor`` to finish, and store the | |
error. | |
""" | |
# Messages | |
MAP = "map" | |
RECEIVE = "receive" | |
async def run(self): | |
results = [] | |
errors = [] | |
n_to_receive = -1 | |
while True: | |
message_type, content = await self.message_box.receive() | |
if message_type == Mapper.MAP: | |
# Spawn a new actor for each element in the iterable | |
fun, arguments_collection = content | |
n_to_receive = len(arguments_collection) | |
for argument in arguments_collection: | |
processor = self.spawn(Processor) | |
await processor.send( | |
Processor.PROCESS, fun, argument, self.message_box | |
) | |
elif message_type == Mapper.RECEIVE: | |
data, = content | |
results.append(data) | |
print(f"received data {data}") | |
elif message_type == Mapper.ERROR: | |
error, = content | |
errors.append(error) | |
print(f"got error {error!r}") | |
if n_to_receive == len(results) + len(errors): | |
break | |
print("done") | |
return results | |
async def main(): | |
root = RootActor() | |
async def f(x): | |
""" An awaitable that returns some data and can potentially raise. | |
""" | |
if x % 3 == 0: | |
raise ValueError(x) | |
await asyncio.sleep(0.5) | |
return x + 1 | |
mapper_message_box = root.spawn(Mapper) | |
await mapper_message_box.send(Mapper.MAP, f, range(10)) | |
result = await root.run() | |
print(result) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment