Created
March 21, 2021 14:05
-
-
Save jvkersch/5265b15a8d9cc85afe16b0e7c51d76af to your computer and use it in GitHub Desktop.
Implementation of the actor model in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import asyncio | |
class MessageBox: | |
def __init__(self): | |
self._queue = asyncio.Queue() | |
async def send(self, message_type, *content): | |
await self._queue.put((message_type, content)) | |
async def receive(self): | |
return await self._queue.get() | |
class Actor(abc.ABC): | |
""" Abstract base class for actors with custom logic. | |
""" | |
# Messages | |
ERROR = "error" | |
@abc.abstractmethod | |
async def run(self): | |
pass | |
def __init__(self, message_box=None, parent_message_box=None): | |
self.message_box = message_box or MessageBox() | |
self._parent_message_box = parent_message_box | |
self._actor_tasks = [] | |
async def _run(self): | |
result = None | |
try: | |
result = await self.run() | |
except asyncio.CancelledError: | |
pass | |
except Exception as e: | |
await self._parent_message_box.send(Actor.ERROR, e) | |
await self._cleanup_children() | |
return result | |
def spawn(self, actor_class, *args, **kwds): | |
actor = self._actor_factory(actor_class) | |
task = asyncio.create_task(actor._run(*args, **kwds)) | |
self._actor_tasks.append(task) | |
return actor.message_box | |
def _actor_factory(self, actor_class): | |
message_box = MessageBox() | |
actor = actor_class( | |
message_box=message_box, parent_message_box=self.message_box | |
) | |
return actor | |
async def _cleanup_children(self): | |
for task in self._actor_tasks: | |
task.cancel() | |
try: | |
await asyncio.gather(*self._actor_tasks) | |
except asyncio.CancelledError: | |
pass | |
class RootActor(Actor): | |
""" An actor that waits for its children and results the result. | |
""" | |
async def run(self): | |
return await asyncio.gather(*self._actor_tasks) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from actors import Actor, RootActor | |
class Counter(Actor): | |
# Messages | |
INCREMENT = "increment" | |
DISPLAY = "display" | |
SHUTDOWN = "shutdown" | |
async def run(self): | |
print("starting") | |
count = 0 | |
while True: | |
print(f"{id(self)} waiting for message") | |
message_type, *content = await self.message_box.receive() | |
print(f"{id(self)} got message", message_type) | |
await asyncio.sleep(0.5) | |
if message_type == Counter.INCREMENT: | |
count += 1 | |
elif message_type == Counter.DISPLAY: | |
print(f"Counter {id(self)}: {count}") | |
elif message_type == Counter.SHUTDOWN: | |
break | |
async def main(): | |
root = RootActor() | |
counter_pid1 = root.spawn(Counter) | |
counter_pid2 = root.spawn(Counter) | |
counter_pid3 = root.spawn(Counter) | |
print("spawned") | |
for _ in range(3): | |
await counter_pid1.send(Counter.INCREMENT) | |
await counter_pid2.send(Counter.INCREMENT) | |
await counter_pid3.send(Counter.INCREMENT) | |
print("done sending") | |
await counter_pid1.send(Counter.DISPLAY) | |
await counter_pid2.send(Counter.DISPLAY) | |
await counter_pid3.send(Counter.DISPLAY) | |
await counter_pid1.send(Counter.SHUTDOWN) | |
await counter_pid2.send(Counter.SHUTDOWN) | |
await counter_pid3.send(Counter.SHUTDOWN) | |
await root.run() | |
print("done running") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from actors import Actor, RootActor | |
class Processor(Actor): | |
# Messages | |
PROCESS = "process" | |
async def run(self): | |
message_type, content = await self.message_box.receive() | |
if message_type == Processor.PROCESS: | |
fun, elem, mapper = content | |
if elem % 3 == 0: | |
raise ValueError() | |
result = await fun(elem) | |
await mapper.send(Mapper.RECEIVE, result) | |
class Mapper(Actor): | |
# Messages | |
MAP = "map" | |
RECEIVE = "receive" | |
async def run(self): | |
results = [] | |
n_to_receive = -1 | |
while True: | |
message_type, content = await self.message_box.receive() | |
if message_type == Mapper.MAP: | |
# Spawn a new actor for each element in the iterable | |
fun, elems = content | |
n_to_receive = len(elems) | |
for elem in elems: | |
proc = self.spawn(Processor) | |
await proc.send(Processor.PROCESS, fun, elem, self.message_box) | |
elif message_type == Mapper.RECEIVE: | |
results.append(content[0]) | |
print("received data", content[0]) | |
if n_to_receive == len(results): | |
break | |
elif message_type == Mapper.ERROR: | |
n_to_receive -= 1 | |
print("got error", content) | |
print("done") | |
return results | |
async def main(): | |
root = RootActor() | |
async def f(x): | |
await asyncio.sleep(0.5) | |
return x + 1 | |
mapper_pid = root.spawn(Mapper) | |
await mapper_pid.send(Mapper.MAP, f, range(10)) | |
retval = await root.run() | |
print(retval) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Many thanks!