Skip to content

Instantly share code, notes, and snippets.

@Corwinpro
Forked from jvkersch/actors.py
Created August 12, 2021 19:21
Show Gist options
  • Save Corwinpro/456535c79071fa71c2b5d7a1896f38f6 to your computer and use it in GitHub Desktop.
Save Corwinpro/456535c79071fa71c2b5d7a1896f38f6 to your computer and use it in GitHub Desktop.
Implementation of the actor model in Python
import abc
import asyncio
class MessageBox:
""" A class that can
- send messages of a certain type with some context (== put a message into
an internal queue),
- receive messages from the internal queue
"""
def __init__(self):
self._queue = asyncio.Queue()
async def send(self, message_type, *content):
await self._queue.put((message_type, content))
async def receive(self):
return await self._queue.get()
class Actor(abc.ABC):
""" Abstract base class for actors with custom logic.
Attributes
----------
message_box: a queue into which messages can be put, and read from.
_actor_tasks: a list of references to the children's _run tasks
"""
# Messages
ERROR = "error"
@abc.abstractmethod
async def run(self):
pass
def __init__(self, message_box=None, parent_message_box=None):
self.message_box = message_box or MessageBox()
self._parent_message_box = parent_message_box
self._actor_tasks = []
async def _run(self):
""" Execute the ``run`` method in a safe way, such that errors are
communicated to the parent message box, and all pending tasks are
cancelled in the end.
"""
result = None
try:
result = await self.run()
except asyncio.CancelledError:
pass
except Exception as e:
await self._parent_message_box.send(Actor.ERROR, e)
await self._cancel_tasks()
return result
async def _cancel_tasks(self):
for task in self._actor_tasks:
task.cancel()
try:
await asyncio.gather(*self._actor_tasks)
except asyncio.CancelledError:
pass
def spawn(self, actor_class, *args, **kwargs):
""" Create a new actor of type ``actor_class``, and run the actor with
the provided args/kwargs. Also, keep the reference to the created "run"
task of the new actor in ``self._actor_tasks``.
"""
actor = self._actor_factory(actor_class)
task = asyncio.create_task(actor._run(*args, **kwargs))
self._actor_tasks.append(task)
return actor.message_box
def _actor_factory(self, actor_class):
""" Create an object of type ``actor_class`` with a new ``message_box``
and a ``parent_message_box`` that is the self's message box.
"""
message_box = MessageBox()
actor = actor_class(
message_box=message_box, parent_message_box=self.message_box
)
return actor
class RootActor(Actor):
""" An actor that waits for its children and returns the result.
"""
async def run(self):
return await asyncio.gather(*self._actor_tasks)
import asyncio
from actors import Actor, RootActor
class Counter(Actor):
# Messages
INCREMENT = "increment"
DISPLAY = "display"
SHUTDOWN = "shutdown"
async def run(self):
print("starting")
count = 0
while True:
print(f"{id(self)} waiting for message")
message_type, *content = await self.message_box.receive()
print(f"{id(self)} got message", message_type)
await asyncio.sleep(0.5)
if message_type == Counter.INCREMENT:
count += 1
elif message_type == Counter.DISPLAY:
print(f"Counter {id(self)}: {count}")
elif message_type == Counter.SHUTDOWN:
break
async def main():
root = RootActor()
counter_pid1 = root.spawn(Counter)
counter_pid2 = root.spawn(Counter)
counter_pid3 = root.spawn(Counter)
print("spawned")
for _ in range(3):
await counter_pid1.send(Counter.INCREMENT)
await counter_pid2.send(Counter.INCREMENT)
await counter_pid3.send(Counter.INCREMENT)
print("done sending")
await counter_pid1.send(Counter.DISPLAY)
await counter_pid2.send(Counter.DISPLAY)
await counter_pid3.send(Counter.DISPLAY)
await counter_pid1.send(Counter.SHUTDOWN)
await counter_pid2.send(Counter.SHUTDOWN)
await counter_pid3.send(Counter.SHUTDOWN)
await root.run()
print("done running")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
from actors import Actor, RootActor
class Processor(Actor):
""" An Actor that awaits for a (function, function argument, message box),
awaits for the function to execute with the argument, and sends the RECEIVE
signal with the result to the message box. Raises If the argument is 0.
The ``raise`` branch demostrantes what happens if a child actor raises.
"""
# Messages
PROCESS = "process"
async def run(self):
message_type, content = await self.message_box.receive()
if message_type == Processor.PROCESS:
fun, argument, message_box = content
if argument == 0:
raise RuntimeError(argument)
result = await fun(argument)
await message_box.send(Mapper.RECEIVE, result)
class Mapper(Actor):
""" An Actor that can handle 3 types of messages: MAP, RECEIVE, ERROR.
For a MAP message, the content is expected to be a function and a
iterable of arguments. For each item in arguments, spawn a ``Processor``
actor that will apply the function to the item.
For a RECEIVE message, which is expected to be sent by a ``Processor``,
store the received data, and if all spawned ``Processor`` s have finished,
break.
For an ERROR message, consider a ``Processor`` to finish, and store the
error.
"""
# Messages
MAP = "map"
RECEIVE = "receive"
async def run(self):
results = []
errors = []
n_to_receive = -1
while True:
message_type, content = await self.message_box.receive()
if message_type == Mapper.MAP:
# Spawn a new actor for each element in the iterable
fun, arguments_collection = content
n_to_receive = len(arguments_collection)
for argument in arguments_collection:
processor = self.spawn(Processor)
await processor.send(
Processor.PROCESS, fun, argument, self.message_box
)
elif message_type == Mapper.RECEIVE:
data, = content
results.append(data)
print(f"received data {data}")
elif message_type == Mapper.ERROR:
error, = content
errors.append(error)
print(f"got error {error!r}")
if n_to_receive == len(results) + len(errors):
break
print("done")
return results
async def main():
root = RootActor()
async def f(x):
""" An awaitable that returns some data and can potentially raise.
"""
if x % 3 == 0:
raise ValueError(x)
await asyncio.sleep(0.5)
return x + 1
mapper_message_box = root.spawn(Mapper)
await mapper_message_box.send(Mapper.MAP, f, range(10))
result = await root.run()
print(result)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment