Created
August 18, 2017 14:34
-
-
Save patriknw/51b8cb0ba54742c752a7a93009d1ebbf to your computer and use it in GitHub Desktop.
Getting Started Guide in plain text format
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Akka Documentation - Getting Started Guide | |
------------------------------------------ | |
Version 2.5.4 | |
Also in HTML format: | |
http://doc.akka.io/docs/akka/current/scala/guide/index.html | |
http://doc.akka.io/docs/akka/current/java/guide/index.html | |
© 2011-2017 Lightbend | |
Akka is Open Source and available under the Apache 2 License. | |
INTRODUCTION TO AKKA | |
Welcome to Akka, a set of open-source libraries for designing scalable, | |
resilient systems that span processor cores and networks. Akka allows | |
you to focus on meeting business needs instead of writing low-level code | |
to provide reliable behavior, fault tolerance, and high performance. | |
Many common practices and accepted programming models do not address | |
important challenges inherent in designing systems for modern computer | |
architectures. To be successful, distributed systems must cope in an | |
environment where components crash without responding, messages get lost | |
without a trace on the wire, and network latency fluctuates. These | |
problems occur regularly in carefully managed intra-datacenter | |
environments - even more so in virtualized architectures. | |
To help you deal with these realities, Akka provides: | |
- Multi-threaded behavior without the use of low-level concurrency | |
constructs like atomics or locks — relieving you from even thinking | |
about memory visibility issues. | |
- Transparent remote communication between systems and their | |
components — relieving you from writing and maintaining difficult | |
networking code. | |
- A clustered, high-availability architecture that is elastic, scales | |
in or out, on demand — enabling you to deliver a truly reactive | |
system. | |
Akka’s use of the actor model provides a level of abstraction that makes | |
it easier to write correct concurrent, parallel and distributed systems. | |
The actor model spans the full set of Akka libraries, providing you with | |
a consistent way of understanding and using them. Thus, Akka offers a | |
depth of integration that you cannot achieve by picking libraries to | |
solve individual problems and trying to piece them together. | |
By learning Akka and how to use the actor model, you will gain access to | |
a vast and deep set of tools that solve difficult distributed/parallel | |
systems problems in a uniform programming model where everything fits | |
together tightly and efficiently. | |
How to get started | |
If this is your first experience with Akka, we recommend that you start | |
by running a simple Hello World project. See the Quickstart Guide | |
Quickstart Guide for instructions on downloading and running the Hello | |
World example. The _Quickstart_ guide walks you through example code | |
that introduces how to define actor systems, actors, and messages as | |
well as how to use the test module and logging. Within 30 minutes, you | |
should be able to run the Hello World example and learn how it is | |
constructed. | |
This _Getting Started_ guide provides the next level of information. It | |
covers why the actor model fits the needs of modern distributed systems | |
and includes a tutorial that will help further your knowledge of Akka. | |
Topics include: | |
- Why modern systems need a new programming model | |
- How the actor model meets the needs of concurrent, distributed | |
systems | |
- Overview of Akka libraries and modules | |
- A more complex example that builds on the Hello World example to | |
illustrate common Akka patterns. | |
WHY MODERN SYSTEMS NEED A NEW PROGRAMMING MODEL | |
The actor model was proposed decades ago by Carl Hewitt as a way to | |
handle parallel processing in a high performance network — an | |
environment that was not available at the time. Today, hardware and | |
infrastructure capabilities have caught up with and exceeded Hewitt’s | |
vision. Consequently, organizations building distributed systems with | |
demanding requirements encounter challenges that cannot fully be solved | |
with a traditional object-oriented programming (OOP) model, but that can | |
benefit from the actor model. | |
Today, the actor model is not only recognized as a highly effective | |
solution — it has been proven in production for some of the world’s most | |
demanding applications. To highlight issues that the actor model | |
addresses, this topic discusses the following mismatches between | |
traditional programming assumptions and the reality of modern | |
multi-threaded, multi-CPU architectures: | |
- The challenge of encapsulation | |
- The illusion of shared memory on modern computer architectures | |
- The illustion of a call stack | |
The challenge of encapsulation | |
A core pillar of OOP is _encapsulation_. Encapsulation dictates that the | |
internal data of an object is not accessible directly from the outside; | |
it can only be modified by invoking a set of curated methods. The object | |
is responsible for exposing safe operations that protect the invariant | |
nature of its encapsulated data. | |
For example, operations on an ordered binary tree implementation must | |
not allow violation of the tree ordering invariant. Callers expect the | |
ordering to be intact and when querying the tree for a certain piece of | |
data, they need to be able to rely on this constraint. | |
When we analyze OOP runtime behavior, we sometimes draw a message | |
sequence chart showing the interactions of method calls. For example: | |
[sequence chart] | |
Unfortunately, the above diagram does not accurately represent the | |
_lifelines_ of the instances during execution. In reality, a _thread_ | |
executes all these calls, and the enforcement of invariants occurs on | |
the same thread from which the method was called. Updating the diagram | |
with the thread of execution, it looks like this: | |
[sequence chart with thread] | |
The significance of this clarification becomes clear when you try to | |
model what happens with _multiple threads_. Suddenly, our neatly drawn | |
diagram becomes inadequate. We can try to illustrate multiple threads | |
accessing the same instance: | |
[sequence chart with threads interacting] | |
There is a section of execution where two threads enter the same method. | |
Unfortunately, the encapsulation model of objects does not guarantee | |
anything about what happens in that section. Instructions of the two | |
invocations can be interleaved in arbitrary ways which eliminate any | |
hope for keeping the invariants intact without some type of coordination | |
between two threads. Now, imagine this issue compounded by the existence | |
of many threads. | |
The common approach to solving this problem is to add a lock around | |
these methods. While this ensures that at most one thread will enter the | |
method at any given time, this is a very costly strategy: | |
- Locks _seriously limit_ concurrency, they are very costly on modern | |
CPU architectures, requiring heavy-lifting from the operating system | |
to suspend the thread and restore it later. | |
- The caller thread is now blocked, so it cannot do any other | |
meaningful work. Even in desktop applications this is unacceptable, | |
we want to keep user-facing parts of applications (its UI) to be | |
responsive even when a long background job is running. In the | |
backend, blocking is outright wasteful. One might think that this | |
can be compensated by launching new threads, but threads are also a | |
costly abstraction. | |
- Locks introduce a new menace: deadlocks. | |
These realities result in a no-win situation: | |
- Without sufficient locks, the state gets corrupted. | |
- With many locks in place, performance suffers and very easily leads | |
to deadlocks. | |
Additionally, locks only really work well locally. When it comes to | |
coordinating across multiple machines, the only alternative is | |
distributed locks. Unfortunately, distributed locks are several | |
magnitudes less efficient than local locks and usually impose a hard | |
limit on scaling out. Distributed lock protocols require several | |
communication round-trips over the network across multiple machines, so | |
latency goes through the roof. | |
In Object Oriented languages we rarely think about threads or linear | |
execution paths in general. We often envision a system as a network of | |
object instances that react to method calls, modify their internal | |
state, then communicate with each other via method calls driving the | |
whole application state forward: | |
[network of interacting objects] | |
However, in a multi-threaded distributed environment, what actually | |
happens is that threads “traverse” this network of object instances by | |
following method calls. As a result, threads are what really drive | |
execution: | |
[network of interactive objects traversed by threads] | |
IN SUMMARY: | |
- OBJECTS CAN ONLY GUARANTEE ENCAPSULATION (PROTECTION OF INVARIANTS) | |
IN THE FACE OF SINGLE-THREADED ACCESS, MULTI-THREAD EXECUTION ALMOST | |
ALWAYS LEADS TO CORRUPTED INTERNAL STATE. EVERY INVARIANT CAN BE | |
VIOLATED BY HAVING TWO CONTENDING THREADS IN THE SAME CODE SEGMENT. | |
- WHILE LOCKS SEEM TO BE THE NATURAL REMEDY TO UPHOLD ENCAPSULATION | |
WITH MULTIPLE THREADS, IN PRACTICE THEY ARE INEFFICIENT AND EASILY | |
LEAD TO DEADLOCKS IN ANY APPLICATION OF REAL-WORLD SCALE. | |
- LOCKS WORK LOCALLY, ATTEMPTS TO MAKE THEM DISTRIBUTED EXIST, BUT | |
OFFER LIMITED POTENTIAL FOR SCALING OUT. | |
The illusion of shared memory on modern computer architectures | |
Programming models of the 80’-90’s conceptualize that writing to a | |
variable means writing to a memory location directly (which somewhat | |
muddies the water that local variables might exist only in registers). | |
On modern architectures - if we simplify things a bit - CPUs are writing | |
to cache lines instead of writing to memory directly. Most of these | |
caches are local to the CPU core, that is, writes by one core are not | |
visible by another. In order to make local changes visible to another | |
core, and hence to another thread, the cache line needs to be shipped to | |
the other core’s cache. | |
On the JVM, we have to explicitly denote memory locations to be shared | |
across threads by using _volatile_ markers or Atomic wrappers. | |
Otherwise, we can access them only in a locked section. Why don’t we | |
just mark all variables as volatile? Because shipping cache lines across | |
cores is a very costly operation! Doing so would implicitly stall the | |
cores involved from doing additional work, and result in bottlenecks on | |
the cache coherence protocol (the protocol CPUs use to transfer cache | |
lines between main memory and other CPUs). The result is magnitudes of | |
slowdown. | |
Even for developers aware of this situation, figuring out which memory | |
locations should be marked as volatile, or which atomic structures to | |
use is a dark art. | |
IN SUMMARY: | |
- THERE IS NO REAL SHARED MEMORY ANYMORE, CPU CORES PASS CHUNKS OF | |
DATA (CACHE LINES) EXPLICITLY TO EACH OTHER JUST AS COMPUTERS ON A | |
NETWORK DO. INTER-CPU COMMUNICATION AND NETWORK COMMUNICATION HAVE | |
MORE IN COMMON THAN MANY REALIZE. PASSING MESSAGES IS THE NORM NOW | |
BE IT ACROSS CPUS OR NETWORKED COMPUTERS. | |
- INSTEAD OF HIDING THE MESSAGE PASSING ASPECT THROUGH VARIABLES | |
MARKED AS SHARED OR USING ATOMIC DATA STRUCTURES, A MORE DISCIPLINED | |
AND PRINCIPLED APPROACH IS TO KEEP STATE LOCAL TO A CONCURRENT | |
ENTITY AND PROPAGATE DATA OR EVENTS BETWEEN CONCURRENT ENTITIES | |
EXPLICITLY VIA MESSAGES. | |
The illusion of a call stack | |
Today, we often take call stacks for granted. But, they were invented in | |
an era where concurrent programming was not as important because | |
multi-CPU systems were not common. Call stacks do not cross threads and | |
hence, do not model asynchronous call chains. | |
The problem arises when a thread intends to delegate a task to the | |
“background”. In practice, this really means delegating to another | |
thread. This cannot be a simple method/function call because calls are | |
strictly local to the thread. What usually happens, is that the “caller” | |
puts an object into a memory location shared by a worker thread | |
(“callee”), which in turn, picks it up in some event loop. This allows | |
the “caller” thread to move on and do other tasks. | |
The first issue is, how can the “caller” be notified of the completion | |
of the task? But a more serious issue arises when a task fails with an | |
exception. Where does the exception propagate to? It will propagate to | |
the exception handler of the worker thread completely ignoring who the | |
actual “caller” was: | |
[exceptions cannot propagate between different threads] | |
This is a serious problem. How does the worker thread deal with the | |
situation? It likely cannot fix the issue as it is usually oblivious of | |
the purpose of the failed task. The “caller” thread needs to be notified | |
somehow, but there is no call stack to unwind with an exception. Failure | |
notification can only be done via a side-channel, for example putting an | |
error code where the “caller” thread otherwise expects the result once | |
ready. If this notification is not in place, the “caller” never gets | |
notified of a failure and the task is lost! THIS IS SURPRISINGLY SIMILAR | |
TO HOW NETWORKED SYSTEMS WORK WHERE MESSAGES/REQUESTS CAN GET LOST/FAIL | |
WITHOUT ANY NOTIFICATION. | |
This bad situation gets worse when things go really wrong and a worker | |
backed by a thread encounters a bug and ends up in an unrecoverable | |
situation. For example, an internal exception caused by a bug bubbles up | |
to the root of the thread and makes the thread shut down. This | |
immediately raises the question, who should restart the normal operation | |
of the service hosted by the thread, and how should it be restored to a | |
known-good state? At first glance, this might seem manageable, but we | |
are suddenly faced by a new, unexpected phenomena: the actual task, that | |
the thread was currently working on, is no longer in the shared memory | |
location where tasks are taken from (usually a queue). In fact, due to | |
the exception reaching to the top, unwinding all of the call stack, the | |
task state is fully lost! WE HAVE LOST A MESSAGE EVEN THOUGH THIS IS | |
LOCAL COMMUNICATION WITH NO NETWORKING INVOLVED (WHERE MESSAGE LOSSES | |
ARE TO BE EXPECTED). | |
IN SUMMARY: | |
- TO ACHIEVE ANY MEANINGFUL CONCURRENCY AND PERFORMANCE ON CURRENT | |
SYSTEMS, THREADS MUST DELEGATE TASKS AMONG EACH OTHER IN AN | |
EFFICIENT WAY WITHOUT BLOCKING. WITH THIS STYLE OF TASK-DELEGATING | |
CONCURRENCY (AND EVEN MORE SO WITH NETWORKED/DISTRIBUTED COMPUTING) | |
CALL STACK-BASED ERROR HANDLING BREAKS DOWN AND NEW, EXPLICIT ERROR | |
SIGNALING MECHANISMS NEED TO BE INTRODUCED. FAILURES BECOME PART OF | |
THE DOMAIN MODEL. | |
- CONCURRENT SYSTEMS WITH WORK DELEGATION NEEDS TO HANDLE SERVICE | |
FAULTS AND HAVE PRINCIPLED MEANS TO RECOVER FROM THEM. CLIENTS OF | |
SUCH SERVICES NEED TO BE AWARE THAT TASKS/MESSAGES MIGHT GET LOST | |
DURING RESTARTS. EVEN IF LOSS DOES NOT HAPPEN, A RESPONSE MIGHT BE | |
DELAYED ARBITRARILY DUE TO PREVIOUSLY ENQUEUED TASKS (A LONG QUEUE), | |
DELAYS CAUSED BY GARBAGE COLLECTION, ETC. IN FACE OF THESE, | |
CONCURRENT SYSTEMS SHOULD HANDLE RESPONSE DEADLINES IN THE FORM OF | |
TIMEOUTS, JUST LIKE NETWORKED/DISTRIBUTED SYSTEMS. | |
Next, let’s see how use of the actor model can overcome these | |
challenges. | |
HOW THE ACTOR MODEL MEETS THE NEEDS OF MODERN, DISTRIBUTED SYSTEMS | |
As described in the previous topic, common programming practices do not | |
properly address the needs of demanding modern systems. Thankfully, we | |
don’t need to scrap everything we know. Instead, the actor model | |
addresses these shortcomings in a principled way, allowing systems to | |
behave in a way that better matches our mental model. The actor model | |
abstraction allows you to think about your code in terms of | |
communication, not unlike the exchanges that occur between people in a | |
large organization. | |
Use of actors allows us to: | |
- Enforce encapsulation without resorting to locks. | |
- Use the model of cooperative entities reacting to signals, changing | |
state, and sending signals to each other to drive the whole | |
application forward. | |
- Stop worrying about an executing mechanism which is a mismatch to | |
our world view. | |
Usage of message passing avoids locking and blocking | |
Instead of calling methods, actors send messages to each other. Sending | |
a message does not transfer the thread of execution from the sender to | |
the destination. An actor can send a message and continue without | |
blocking. Therefore, it can accomplish more in the same amount of time. | |
With objects, when a method returns, it releases control of its | |
executing thread. In this respect, actors behave much like objects, they | |
react to messages and return execution when they finish processing the | |
current message. In this way, actors actually achieve the execution we | |
imagined for objects: | |
[actors interact with each other by sending messages] | |
An important difference between passing messages and calling methods is | |
that messages have no return value. By sending a message, an actor | |
delegates work to another actor. As we saw in The illusion of a call | |
stack, if it expected a return value, the sending actor would either | |
need to block or to execute the other actor’s work on the same thread. | |
Instead, the receiving actor delivers the results in a reply message. | |
The second key change we need in our model is to reinstate | |
encapsulation. Actors react to messages just like objects “react” to | |
methods invoked on them. The difference is that instead of multiple | |
threads “protruding” into our actor and wreaking havoc to internal state | |
and invariants, actors execute independently from the senders of a | |
message, and they react to incoming messages sequentially, one at a | |
time. While each actor processes messages sent to it sequentially, | |
different actors work concurrently with each other so that an actor | |
system can process as many messages simultaneously as the hardware will | |
support. | |
Since there is always at most one message being processed per actor, the | |
invariants of an actor can be kept without synchronization. This happens | |
automatically without using locks: | |
[messages don’t invalidate invariants as they are processed | |
sequentially] | |
In summary, this is what happens when an actor receives a message: | |
1. The actor adds the message to the end of a queue. | |
2. If the actor was not scheduled for execution, it is marked as ready | |
to execute. | |
3. A (hidden) scheduler entity takes the actor and starts executing it. | |
4. Actor picks the message from the front of the queue. | |
5. Actor modifies internal state, sends messages to other actors. | |
6. The actor is unscheduled. | |
To accomplish this behavior, actors have: | |
- A mailbox (the queue where messages end up). | |
- A behavior (the state of the actor, internal variables etc.). | |
- Messages (pieces of data representing a signal, similar to method | |
calls and their parameters). | |
- An execution environment (the machinery that takes actors that have | |
messages to react to and invokes their message handling code). | |
- An address (more on this later). | |
Messages go into actor mailboxes. The behavior of the actor describes | |
how the actor responds to messages (like sending more messages and/or | |
changing state). An execution environment orchestrates a pool of threads | |
to drive all these actions completely transparently. | |
This is a very simple model and it solves the issues enumerated | |
previously: | |
- Encapsulation is preserved by decoupling execution from signaling | |
(method calls transfer execution, message passing does not). | |
- There is no need for locks. Modifying the internal state of an actor | |
is only possible via messages, which are processed one at a time | |
eliminating races when trying to keep invariants. | |
- There are no locks used anywhere, and senders are not blocked. | |
Millions of actors can be efficiently scheduled on a dozen of | |
threads reaching the full potential of modern CPUs. Task delegation | |
is the natural mode of operation for actors. | |
- State of actors is local and not shared, changes and data is | |
propagated via messages, which maps to how modern memory hierarchy | |
actually works. In many cases, this means transferring over only the | |
cache lines that contain the data in the message while keeping local | |
state and data cached at the original core. The same model maps | |
exactly to remote communication where the state is kept in the RAM | |
of machines and changes/data is propagated over the network as | |
packets. | |
Actors handle error situations gracefully | |
Since we no longer have a shared call stack between actors that send | |
messages to each other, we need to handle error situations differently. | |
There are two kinds of errors we need to consider: | |
- The first case is when the delegated task on the target actor failed | |
due to an error in the task (typically some validation issue, like a | |
non-existent user ID). In this case, the service encapsulated by the | |
target actor is intact, it is only the task that itself is | |
erroneous. The service actor should reply to the sender with a | |
message, presenting the error case. There is nothing special here, | |
errors are part of the domain and hence become ordinary messages. | |
- The second case is when a service itself encounters an internal | |
fault. Akka enforces that all actors are organized into a tree-like | |
hierarchy, i.e. an actor that creates another actor becomes the | |
parent of that new actor. This is very similar how operating systems | |
organize processes into a tree. Just like with processes, when an | |
actor fails, its parent actor is notified and it can react to the | |
failure. Also, if the parent actor is stopped, all of its children | |
are recursively stopped, too. This service is called supervision and | |
it is central to Akka. | |
[actors supervise and handle the failures of child actors] | |
A supervisor (parent) can decide to restart its child actors on certain | |
types of failures or stop them completely on others. Children never go | |
silently dead (with the notable exception of entering an infinite loop) | |
instead they are either failing and their parent can react to the fault, | |
or they are stopped (in which case interested parties are automatically | |
notified). There is always a responsible entity for managing an actor: | |
its parent. Restarts are not visible from the outside: collaborating | |
actors can keep continuing sending messages while the target actor | |
restarts. | |
Now, let’s take a short tour of the functionality Akka provides. | |
OVERVIEW OF AKKA LIBRARIES AND MODULES | |
Before delving into some best practices for writing actors, it will be | |
helpful to preview the most commonly used Akka libraries. This will help | |
you start thinking about the functionality you want to use in your | |
system. All core Akka functionality is available as Open Source Software | |
(OSS). Lightbend sponsors Akka development but can also help you with | |
commercial offerings such as training, consulting, support, and | |
Enterprise Suite — a comprehensive set of tools for managing Akka | |
systems. | |
The following capabilities are included with Akka OSS and are introduced | |
later on this page: | |
- Actor library | |
- Remoting | |
- Cluster | |
- Cluster Sharding | |
- Cluster Singleton | |
- Cluster Publish-Subscribe | |
- Persistence | |
- Distributed Data | |
- HTTP | |
With a Lightbend subscription, you can use Enterprise Suite in | |
production. Enterprise Suite includes the following extensions to Akka | |
core functionality: | |
- Split Brain Resolver — Detects and recovers from network partitions, | |
eliminating data inconsistencies and possible downtime. | |
- Configuration Checker — Checks for potential configuration issues | |
and logs suggestions. | |
- Diagnostics Recorder — Captures configuration and system information | |
in a format that makes it easy to troubleshoot issues during | |
development and production. | |
- Thread Starvation Detector — Monitors an Akka system dispatcher and | |
logs warnings if it becomes unresponsive. | |
This page does not list all available modules, but overviews the main | |
functionality and gives you an idea of the level of sophistication you | |
can reach when you start building systems on top of Akka. | |
Actor library | |
The core Akka library is akka-actor. But, actors are used across Akka | |
libraries, providing a consistent, integrated model that relieves you | |
from individually solving the challenges that arise in concurrent or | |
distributed system design. From a birds-eye view, actors are a | |
programming paradigm that takes encapsulation, one of the pillars of | |
OOP, to its extreme. Unlike objects, actors encapsulate not only their | |
state but their execution. Communication with actors is not via method | |
calls but by passing messages. While this difference may seem minor, it | |
is actually what allows us to break clean from the limitations of OOP | |
when it comes to concurrency and remote communication. Don’t worry if | |
this description feels too high level to fully grasp yet, in the next | |
chapter we will explain actors in detail. For now, the important point | |
is that this is a model that handles concurrency and distribution at the | |
fundamental level instead of ad hoc patched attempts to bring these | |
features to OOP. | |
Challenges that actors solve include the following: | |
- How to build and design high-performance, concurrent applications. | |
- How to handle errors in a multi-threaded environment. | |
- How to protect my project from the pitfalls of concurrency. | |
Remoting | |
Remoting enables actors that live on different computers, to seamlessly | |
exchange messages. While distributed as a JAR artifact, Remoting | |
resembles a module more than it does a library. You enable it mostly | |
with configuration and it has only a few APIs. Thanks to the actor | |
model, a remote and local message send looks exactly the same. The | |
patterns that you use on local systems translate directly to remote | |
systems. You will rarely need to use Remoting directly, but it provides | |
the foundation on which the Cluster subsystem is built. | |
Challenges Remoting solves include the following: | |
- How to address actor systems living on remote hosts. | |
- How to address individual actors on remote actor systems. | |
- How to turn messages to bytes on the wire. | |
- How to manage low-level, network connections (and reconnections) | |
between hosts, detect crashed actor systems and hosts, all | |
transparently. | |
- How to multiplex communications from an unrelated set of actors on | |
the same network connection, all transparently. | |
Cluster | |
If you have a set of actor systems that cooperate to solve some business | |
problem, then you likely want to manage these set of systems in a | |
disciplined way. While Remoting solves the problem of addressing and | |
communicating with components of remote systems, Clustering gives you | |
the ability to organize these into a “meta-system” tied together by a | |
membership protocol. IN MOST CASES, YOU WANT TO USE THE CLUSTER MODULE | |
INSTEAD OF USING REMOTING DIRECTLY. Clustering provides an additional | |
set of services on top of Remoting that most real world applications | |
need. | |
Challenges the Cluster module solves include the following: | |
- How to maintain a set of actor systems (a cluster) that can | |
communicate with each other and consider each other as part of the | |
cluster. | |
- How to introduce a new system safely to the set of already existing | |
members. | |
- How to reliably detect systems that are temporarily unreachable. | |
- How to remove failed hosts/systems (or scale down the system) so | |
that all remaining members agree on the remaining subset of the | |
cluster? | |
- How to distribute computations among the current set of members. | |
- How do I designate members of the cluster to a certain role, in | |
other words, to provide certain services and not others. | |
Cluster Sharding | |
Sharding helps to solve the problem of distributing a set of actors | |
among members of an Akka cluster. Sharding is a pattern that mostly used | |
together with Persistence to balance a large set of persistent entities | |
(backed by actors) to members of a cluster and also migrate them to | |
other nodes when members crash or leave. | |
Challenges that Sharding solves include the following: | |
- How to model and scale out a large set of stateful entities on a set | |
of systems. | |
- How to ensure that entities in the cluster are distributed properly | |
so that load is properly balanced across the machines. | |
- How to ensure migrating entities from a crashed system without | |
losing the state. | |
- How to ensure that an entity does not exist on multiple systems at | |
the same time and hence kept consistent. | |
Cluster Singleton | |
A common (in fact, a bit too common) use case in distributed systems is | |
to have a single entity responsible for a given task which is shared | |
among other members of the cluster and migrated if the host system | |
fails. While this undeniably introduces a common bottleneck for the | |
whole cluster that limits scaling, there are scenarios where the use of | |
this pattern is unavoidable. Cluster singleton allows a cluster to | |
select an actor system which will host a particular actor while other | |
systems can always access said service independently from where it is. | |
The Singleton module can be used to solve these challenges: | |
- How to ensure that only one instance of a service is running in the | |
whole cluster. | |
- How to ensure that the service is up even if the system hosting it | |
currently crashes or shut down during the process of scaling down. | |
- How to reach this instance from any member of the cluster assuming | |
that it can migrate to other systems over time. | |
Cluster Publish-Subscribe | |
For coordination among systems, it is often necessary to distribute | |
messages to all, or one system of a set of interested systems in a | |
cluster. This pattern is usually called publish-subscribe and this | |
module solves this exact problem. It is possible to broadcast messages | |
to all subscribers of a topic or send a message to an arbitrary actor | |
that has expressed interest. | |
Cluster Publish-Subscribe is intended to solve the following challenges: | |
- How to broadcast messages to an interested set of parties in a | |
cluster. | |
- How to send a message to a member from an interested set of parties | |
in a cluster. | |
- How to subscribe and unsubscribe for events of a certain topic in | |
the cluster. | |
Persistence | |
Just like objects in OOP, actors keep their state in volatile memory. | |
Once the system is shut down, gracefully or because of a crash, all data | |
that was in memory is lost. Persistence provides patterns to enable | |
actors to persist events that lead to their current state. Upon startup, | |
events can be replayed to restore the state of the entity hosted by the | |
actor. The event stream can be queried and fed into additional | |
processing pipelines (an external Big Data cluster for example) or | |
alternate views (like reports). | |
Persistence tackles the following challenges: | |
- How to restore the state of an entity/actor when system restarts or | |
crashes. | |
- How to implement a CQRS system. | |
- How to ensure reliable delivery of messages in face of network | |
errors and system crashes. | |
- How to introspect domain events that have lead an entity to its | |
current state. | |
- How to leverage Event Sourcing in my application to support | |
long-running processes while the project continues to evolve. | |
Distributed Data | |
In situations where eventual consistency is acceptable, it is possible | |
to share data between nodes in an Akka Cluster and accept both reads and | |
writes even in the face of cluster partitions. This can be achieved | |
using Conflict Free Replicated Data Types (CRDTs), where writes on | |
different nodes can happen concurrently and are merged in a predictable | |
way afterward. The Distributed Data module provides infrastructure to | |
share data and a number of useful data types. | |
Distributed Data is intended to solve the following challenges: | |
- How to accept writes even in the face of cluster partitions. | |
- How to share data while at the same time ensuring low-latency local | |
read and write access. | |
Streams | |
Actors are a fundamental model for concurrency, but there are common | |
patterns where their use requires the user to implement the same pattern | |
over and over. Very common is the scenario where a chain, or graph, of | |
actors, need to process a potentially large, or infinite, stream of | |
sequential events and properly coordinate resource usage so that faster | |
processing stages does not overwhelm slower ones in the chain or graph. | |
Streams provide a higher-level abstraction on top of actors that | |
simplifies writing such processing networks, handling all the fine | |
details in the background and providing a safe, typed, composable | |
programming model. Streams is also an implementation of the Reactive | |
Streams standard which enables integration with all third party | |
implementations of that standard. | |
Streams solve the following challenges: | |
- How to handle streams of events or large datasets with high | |
performance, exploiting concurrency and keep resource usage tight. | |
- How to assemble reusable pieces of event/data processing into | |
flexible pipelines. | |
- How to connect asynchronous services in a flexible way to each | |
other, and have good performance. | |
- How to provide or consume Reactive Streams compliant interfaces to | |
interface with a third party library. | |
HTTP | |
The de facto standard for providing APIs remotely, internal or external, | |
is HTTP. Akka provides a library to construct or consume such HTTP | |
services by giving a set of tools to create HTTP services (and serve | |
them) and a client that can be used to consume other services. These | |
tools are particularly suited to streaming in and out a large set of | |
data or real-time events by leveraging the underlying model of Akka | |
Streams. | |
Some of the challenges that HTTP tackles: | |
- How to expose services of a system or cluster to the external world | |
via an HTTP API in a performant way. | |
- How to stream large datasets in and out of a system using HTTP. | |
- How to stream live events in and out of a system using HTTP. | |
Example of module use | |
Akka modules integrate together seamlessly. For example, think of a | |
large set of stateful business objects, such as documents or shopping | |
carts, that website users access. If you model these as sharded | |
entities, using Sharding and Persistence, they will be balanced across a | |
cluster that you can scale out on-demand. They will be available during | |
spikes that come from advertising campaigns or before holidays will be | |
handled, even if some systems crash. You can also easily take the | |
real-time stream of domain events with Persistence Query and use Streams | |
to pipe them into a streaming Fast Data engine. Then, take the output of | |
that engine as a Stream, manipulate it using Akka Streams operators and | |
expose it as web socket connections served by a load balanced set of | |
HTTP servers hosted by your cluster to power your real-time business | |
analytics tool. | |
We hope this preview caught your interest! The next topic introduces the | |
example application we will build in the tutorial portion of this guide. | |
INTRODUCTION TO THE EXAMPLE | |
When writing prose, the hardest part is often composing the first few | |
sentences. There is a similar “blank canvas” feeling when starting to | |
build an Akka system. You might wonder: Which should be the first actor? | |
Where should it live? What should it do? Fortunately — unlike with prose | |
— established best practices can guide us through these initial steps. | |
In the remainder of this guide, we examine the core logic of a simple | |
Akka application to introduce you to actors and show you how to | |
formulate solutions with them. The example demonstrates common patterns | |
that will help you kickstart your Akka projects. | |
Prerequisites | |
You should have already followed the instructions in the Akka Quickstart | |
with Scala guide Akka Quickstart with Java guide to download and run the | |
Hello World example. You will use this as a seed project and add the | |
functionality described in this tutorial. | |
IoT example use case | |
In this tutorial, we’ll use Akka to build out part of an Internet of | |
Things (IoT) system that reports data from sensor devices installed in | |
customers’ homes. The example focuses on temperature readings. The | |
target use case simply allows customers to log in and view the last | |
reported temperature from different areas of their homes. You can | |
imagine that such sensors could also collect relative humidity or other | |
interesting data and an application would likely support reading and | |
changing device configuration, maybe even alerting home owners when | |
sensor state falls outside of a particular range. | |
In a real system, the application would be exposed to customers through | |
a mobile app or browser. This guide concentrates only on the core logic | |
for storing temperaturs that would be called over a network protocol, | |
such as HTTP. It also includes writing tests to help you get comfortable | |
and proficient with testing actors. | |
The tutorial application consists of two main components: | |
- DEVICE DATA COLLECTION: — maintains a local representation of the | |
remote devices. Multiple sensor devices for a home are organized | |
into one device group. | |
- USER DASHBOARD: — periodically collects data from the devices for a | |
logged in user’s home and presents the results as a report. | |
The following diagram illustrates the example application architecture. | |
Since we are interested in the state of each sensor device, we will | |
model devices as actors. The running application will create as many | |
instances of device actors and device groups as necessary. | |
[box diagram of the architecture] | |
What you will learn in this tutorial | |
This tutorial introduces and illustrates: | |
- The actor hierarchy and how it influences actor behavior | |
- How to choose the right granularity for actors | |
- How to define protocols as messages | |
- Typical conversational styles | |
Let’s get started by learning more about actors. | |
PART 1: ACTOR ARCHITECTURE | |
Use of Akka relieves you from creating the infrastructure for an actor | |
system and from writing the low-level code necessary to control basic | |
behavior. To appreciate this, let’s look at the relationships between | |
actors you create in your code and those that Akka creates and manages | |
for you internally, the actor lifecycle, and failure handling. | |
The Akka actor hierarchy | |
An actor in Akka always belongs to a parent. Typically, you create an | |
actor by calling getContext().actorOf()context.actorOf(). Rather than | |
creating a “freestanding” actor, this injects the new actor as a child | |
into an already existing tree: the creator actor becomes the _parent_ of | |
the newly created _child_ actor. You might ask then, who is the parent | |
of the _first_ actor you create? | |
As illustrated below, all actors have a common parent, the user | |
guardian. New actor instances can be created under this actor using | |
system.actorOf(). As we covered in the Quickstart GuideQuickstart Guide, | |
creation of an actor returns a reference that is a valid URL. So, for | |
example, if we create an actor named someActor with | |
system.actorOf(…, "someActor"), its reference will include the path | |
/user/someActor. | |
[box diagram of the architecture] | |
In fact, before you create an actor in your code, Akka has already | |
created three actors in the system. The names of these built-in actors | |
contain _guardian_ because they _supervise_ every child actor in their | |
path. The guardian actors include: | |
- / the so-called _root guardian_. This is the parent of all actors in | |
the system, and the last one to stop when the system itself is | |
terminated. | |
- /user the _guardian_. THIS IS THE PARENT ACTOR FOR ALL USER CREATED | |
ACTORS. Don’t let the name user confuse you, it has nothing to do | |
with end users, nor with user handling. Every actor you create using | |
the Akka library will have the constant path /user/ prepended to it. | |
- /system the _system guardian_. | |
In the Hello World example, we have already seen how system.actorOf(), | |
creates an actor directly under /user. We call this a _top level_ actor, | |
even though, in practice it is only on the top of the _user defined_ | |
hierarchy. You typically have only one (or very few) top level actors in | |
your ActorSystem. We create child, or non-top-level, actors by invoking | |
context.actorOf() from an existing actor. The context.actorOf() method | |
has a signature identical to system.actorOf(), its top-level | |
counterpart. | |
The easiest way to see the actor hierarchy in action is to simply print | |
ActorRef instances. In this small experiment, we create an actor, print | |
its reference, create a child of this actor, and print the child’s | |
reference. We start with the Hello World project, if you have not | |
downloaded it, download the Quickstart project from the Lightbend Tech | |
HubLightbend Tech Hub. | |
In your Hello World project, navigate to the com.lightbend.akka.sample | |
package and create a new Scala file called | |
ActorHierarchyExperiments.scalaJava file called | |
ActorHierarchyExperiments.java here. Copy and paste the code from the | |
snippet below to this new source file. Save your file and run | |
sbt "runMain com.lightbend.akka.sample.ActorHierarchyExperiments" to | |
observe the output. | |
Scala | |
package com.lightbend.akka.sample | |
import akka.actor.{ Actor, Props, ActorSystem } | |
import scala.io.StdIn | |
class PrintMyActorRefActor extends Actor { | |
override def receive: Receive = { | |
case "printit" => | |
val secondRef = context.actorOf(Props.empty, "second-actor") | |
println(s"Second: $secondRef") | |
} | |
} | |
object ActorHierarchyExperiments extends App { | |
val system = ActorSystem() | |
val firstRef = system.actorOf(Props[PrintMyActorRefActor], "first-actor") | |
println(s"First: $firstRef") | |
firstRef ! "printit" | |
println(">>> Press ENTER to exit <<<") | |
try StdIn.readLine() | |
finally system.terminate() | |
} | |
Java | |
package com.lightbend.akka.sample; | |
import akka.actor.AbstractActor; | |
import akka.actor.AbstractActor.Receive; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.actor.Props; | |
class PrintMyActorRefActor extends AbstractActor { | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.matchEquals("printit", p -> { | |
ActorRef secondRef = getContext().actorOf(Props.empty(), "second-actor"); | |
System.out.println("Second: " + secondRef); | |
}) | |
.build(); | |
} | |
} | |
public class ActorHierarchyExperiments { | |
public static void main(String[] args) throws java.io.IOException { | |
ActorSystem system = ActorSystem.create("test"); | |
ActorRef firstRef = system.actorOf(Props.create(PrintMyActorRefActor.class), "first-actor"); | |
System.out.println("First: " + firstRef); | |
firstRef.tell("printit", ActorRef.noSender()); | |
System.out.println(">>> Press ENTER to exit <<<"); | |
try { | |
System.in.read(); | |
} finally { | |
system.terminate(); | |
} | |
} | |
} | |
Note the way a message asked the first actor to do its work. We sent the | |
message by using the parent’s reference: | |
firstRef ! "printit"firstRef.tell("printit", ActorRef.noSender()). When | |
the code executes, the output includes the references for the first | |
actor and the child it created as part of the printit case. Your output | |
should look similar to the following: | |
First: Actor[akka://testSystem/user/first-actor#1053618476] | |
Second: Actor[akka://testSystem/user/first-actor/second-actor#-1544706041] | |
Notice the structure of the references: | |
- Both paths start with akka://testSystem/. Since all actor references | |
are valid URLs, akka:// is the value of the protocol field. | |
- Next, just like on the World Wide Web, the URL identifies the | |
system. In this example, the system is named testSystem, but it | |
could be any other name. If remote communication between multiple | |
systems is enabled, this part of the URL includes the hostname so | |
other systems can find it on the network. | |
- Because the second actor’s reference includes the path | |
/first-actor/, it identifies it as a child of the first. | |
- The last part of the actor reference, #1053618476 or #-1544706041 is | |
a unique identifier that you can ignore in most cases. | |
Now that you understand what the actor hierarchy looks like, you might | |
be wondering: _Why do we need this hierarchy? What is it used for?_ | |
An important role of the hierarchy is to safely manage actor lifecycles. | |
Let’s consider this next and see how that knowledge can help us write | |
better code. | |
The actor lifecycle | |
Actors pop into existence when created, then later, at user requests, | |
they are stopped. Whenever an actor is stopped, all of its children are | |
_recursively stopped_ too. This behavior greatly simplifies resource | |
cleanup and helps avoid resource leaks such as those caused by open | |
sockets and files. In fact, a commonly overlooked difficulty when | |
dealing with low-level multi-threaded code is the lifecycle management | |
of various concurrent resources. | |
To stop an actor, the recommended pattern is to call | |
context.stop(self)getContext().stop(getSelf()) inside the actor to stop | |
itself, usually as a response to some user defined stop message or when | |
the actor is done with its job. Stopping another actor is technically | |
possible by calling context.stop(actorRef)getContext().stop(actorRef), | |
but IT IS CONSIDERED A BAD PRACTICE TO STOP ARBITRARY ACTORS THIS WAY: | |
try sending them a PoisonPill or custom stop message instead. | |
The Akka actor API exposes many lifecycle hooks that you can override in | |
an actor implementation. The most commonly used are preStart() and | |
postStop(). | |
- preStart() is invoked after the actor has started but before it | |
processes its first message. | |
- postStop() is invoked just before the actor stops. No messages are | |
processed after this point. | |
Let’s use the preStart() and postStop() lifecycle hooks in a simple | |
experiment to observe the behavior when we stop an actor. First, add the | |
following 2 actor classes to your project: | |
Scala | |
class StartStopActor1 extends Actor { | |
override def preStart(): Unit = { | |
println("first started") | |
context.actorOf(Props[StartStopActor2], "second") | |
} | |
override def postStop(): Unit = println("first stopped") | |
override def receive: Receive = { | |
case "stop" => context.stop(self) | |
} | |
} | |
class StartStopActor2 extends Actor { | |
override def preStart(): Unit = println("second started") | |
override def postStop(): Unit = println("second stopped") | |
// Actor.emptyBehavior is a useful placeholder when we don't | |
// want to handle any messages in the actor. | |
override def receive: Receive = Actor.emptyBehavior | |
} | |
Java | |
class StartStopActor1 extends AbstractActor { | |
@Override | |
public void preStart() { | |
System.out.println("first started"); | |
getContext().actorOf(Props.create(StartStopActor2.class), "second"); | |
} | |
@Override | |
public void postStop() { | |
System.out.println("first stopped"); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.matchEquals("stop", s -> { | |
getContext().stop(getSelf()); | |
}) | |
.build(); | |
} | |
} | |
class StartStopActor2 extends AbstractActor { | |
@Override | |
public void preStart() { | |
System.out.println("second started"); | |
} | |
@Override | |
public void postStop() { | |
System.out.println("second stopped"); | |
} | |
// Actor.emptyBehavior is a useful placeholder when we don't | |
// want to handle any messages in the actor. | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.build(); | |
} | |
} | |
And create a ‘main’ class like above to start the actors and then send | |
them a "stop" message: | |
Scala | |
val first = system.actorOf(Props[StartStopActor1], "first") | |
first ! "stop" | |
Java | |
ActorRef first = system.actorOf(Props.create(StartStopActor1.class), "first"); | |
first.tell("stop", ActorRef.noSender()); | |
You can again use sbt to start this program. The output should look like | |
this: | |
first started | |
second started | |
second stopped | |
first stopped | |
When we stopped actor first, it stopped its child actor, second, before | |
stopping itself. This ordering is strict, _all_ postStop() hooks of the | |
children are called before the postStop() hook of the parent is called. | |
The Actor Lifecycle section of the Akka reference manual provides | |
details on the full set of lifecyle hooks. | |
Failure handling | |
Parents and children are connected throughout their lifecycles. Whenever | |
an actor fails (throws an exception or an unhandled exception bubbles | |
out from receive) it is temporarily suspended. As mentioned earlier, the | |
failure information is propagated to the parent, which then decides how | |
to handle the exception caused by the child actor. In this way, parents | |
act as supervisors for their children. The default _supervisor strategy_ | |
is to stop and restart the child. If you don’t change the default | |
strategy all failures result in a restart. | |
Let’s observe the default strategy in a simple experiment. Add the | |
following classes to your project, just as you did with the previous | |
ones: | |
Scala | |
class SupervisingActor extends Actor { | |
val child = context.actorOf(Props[SupervisedActor], "supervised-actor") | |
override def receive: Receive = { | |
case "failChild" => child ! "fail" | |
} | |
} | |
class SupervisedActor extends Actor { | |
override def preStart(): Unit = println("supervised actor started") | |
override def postStop(): Unit = println("supervised actor stopped") | |
override def receive: Receive = { | |
case "fail" => | |
println("supervised actor fails now") | |
throw new Exception("I failed!") | |
} | |
} | |
Java | |
class SupervisingActor extends AbstractActor { | |
ActorRef child = getContext().actorOf(Props.create(SupervisedActor.class), "supervised-actor"); | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.matchEquals("failChild", f -> { | |
child.tell("fail", getSelf()); | |
}) | |
.build(); | |
} | |
} | |
class SupervisedActor extends AbstractActor { | |
@Override | |
public void preStart() { | |
System.out.println("supervised actor started"); | |
} | |
@Override | |
public void postStop() { | |
System.out.println("supervised actor stopped"); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.matchEquals("fail", f -> { | |
System.out.println("supervised actor fails now"); | |
throw new Exception("I failed!"); | |
}) | |
.build(); | |
} | |
} | |
And run with: | |
Scala | |
val supervisingActor = system.actorOf(Props[SupervisingActor], "supervising-actor") | |
supervisingActor ! "failChild" | |
Java | |
ActorRef supervisingActor = system.actorOf(Props.create(SupervisingActor.class), "supervising-actor"); | |
supervisingActor.tell("failChild", ActorRef.noSender()); | |
You should see output similar to the following: | |
supervised actor started | |
supervised actor fails now | |
supervised actor stopped | |
supervised actor started | |
[ERROR] [03/29/2017 10:47:14.150] [testSystem-akka.actor.default-dispatcher-2] [akka://testSystem/user/supervising-actor/supervised-actor] I failed! | |
java.lang.Exception: I failed! | |
at tutorial_1.SupervisedActor$$anonfun$receive$4.applyOrElse(ActorHierarchyExperiments.scala:57) | |
at akka.actor.Actor$class.aroundReceive(Actor.scala:513) | |
at tutorial_1.SupervisedActor.aroundReceive(ActorHierarchyExperiments.scala:47) | |
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:519) | |
at akka.actor.ActorCell.invoke(ActorCell.scala:488) | |
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) | |
at akka.dispatch.Mailbox.run(Mailbox.scala:224) | |
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) | |
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) | |
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) | |
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) | |
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) | |
We see that after failure the supervised actor is stopped and | |
immediately restarted. We also see a log entry reporting the exception | |
that was handled, in this case, our test exception. In this example we | |
used preStart() and postStop() hooks which are the default to be called | |
after and before restarts, so we cannot distinguish from inside the | |
actor whether it was started for the first time or restarted. This is | |
usually the right thing to do, the purpose of the restart is to set the | |
actor in a known-good state, which usually means a clean starting stage. | |
WHAT ACTUALLY HAPPENS THOUGH IS THAT THE preRestart() AND postRestart() | |
METHODS ARE CALLED WHICH, IF NOT OVERRIDDEN, BY DEFAULT DELEGATE TO | |
postStop() AND preStart() RESPECTIVELY. You can experiment with | |
overriding these additional methods and see how the output changes. | |
For the impatient, we also recommend looking into the supervision | |
reference page for more in-depth details. | |
SUMMARY | |
We’ve learned about how Akka manages actors in hierarchies where parents | |
supervise their children and handle exceptions. We saw how to create a | |
very simple actor and child. Next, we’ll apply this knowledge to our | |
example use case by modeling the communication necessary to get | |
information from device actors. Later, we’ll deal with how to manage the | |
actors in groups. | |
PART 2: CREATING THE FIRST ACTOR | |
With an understanding of actor hierarchy and behavior, the remaining | |
question is how to map the top-level components of our IoT system to | |
actors. It might be tempting to make the actors that represent devices | |
and dashboards at the top level. Instead, we recommend creating an | |
explicit component that represents the whole application. In other | |
words, we will have a single top-level actor in our IoT system. The | |
components that create and manage devices and dashboards will be | |
children of this actor. This allows us to refactor the example use case | |
architecture diagram into a tree of actors: | |
[actor tree diagram of the architecture] | |
We can define the first actor, the IotSupervisor, with a few simple | |
lines of code. To start your tutorial application: | |
1. Create a new IotSupervisor source file in the | |
com.lightbend.akka.sample package. | |
2. Paste the following code into the new file to define the | |
IotSupervisor. | |
Scala | |
package com.lightbend.akka.sample | |
import akka.actor.{ Actor, ActorLogging, Props } | |
object IotSupervisor { | |
def props(): Props = Props(new IotSupervisor) | |
} | |
class IotSupervisor extends Actor with ActorLogging { | |
override def preStart(): Unit = log.info("IoT Application started") | |
override def postStop(): Unit = log.info("IoT Application stopped") | |
// No need to handle any messages | |
override def receive = Actor.emptyBehavior | |
} | |
Java | |
package com.lightbend.akka.sample; | |
import akka.actor.AbstractActor; | |
import akka.actor.ActorLogging; | |
import akka.actor.Props; | |
import akka.event.Logging; | |
import akka.event.LoggingAdapter; | |
public class IotSupervisor extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
public static Props props() { | |
return Props.create(IotSupervisor.class); | |
} | |
@Override | |
public void preStart() { | |
log.info("IoT Application started"); | |
} | |
@Override | |
public void postStop() { | |
log.info("IoT Application stopped"); | |
} | |
// No need to handle any messages | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.build(); | |
} | |
} | |
The code is similar to the actor examples we used in the previous | |
experiments, but notice: | |
- Instead of println() we use the ActorLogging helper trait | |
akka.event.Logging, which directly invokes Akka’s built in logging | |
facility. | |
- We use the recommended pattern for creating actors by defining a | |
props() method in the companion object of static method on the | |
actor. | |
To provide the main entry point that creates the actor system, add the | |
following code to the new IotApp object IotMain class. | |
Scala | |
package com.lightbend.akka.sample | |
import akka.actor.ActorSystem | |
import scala.io.StdIn | |
object IotApp { | |
def main(args: Array[String]): Unit = { | |
val system = ActorSystem("iot-system") | |
try { | |
// Create top level supervisor | |
val supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor") | |
// Exit the system after ENTER is pressed | |
StdIn.readLine() | |
} finally { | |
system.terminate() | |
} | |
} | |
} | |
Java | |
package com.lightbend.akka.sample; | |
import java.io.IOException; | |
import akka.actor.ActorSystem; | |
import akka.actor.ActorRef; | |
public class IotMain { | |
public static void main(String[] args) throws IOException { | |
ActorSystem system = ActorSystem.create("iot-system"); | |
try { | |
// Create top level supervisor | |
ActorRef supervisor = system.actorOf(IotSupervisor.props(), "iot-supervisor"); | |
System.out.println("Press ENTER to exit the system"); | |
System.in.read(); | |
} finally { | |
system.terminate(); | |
} | |
} | |
} | |
The application does little, other than print out that it is started. | |
But, we have the first actor in place and we are ready to add other | |
actors. | |
What’s next? | |
In the following chapters we will grow the application gradually, by: | |
1. Creating the representation for a device. | |
2. Creating the device management component. | |
3. Adding query capabilities to device groups. | |
PART 3: WORKING WITH DEVICE ACTORS | |
In the previous topics we explained how to view actor systems _in the | |
large_, that is, how components should be represented, how actors should | |
be arranged in the hierarchy. In this part, we will look at actors _in | |
the small_ by implementing the device actor. | |
If we were working with objects, we would typically design the API as | |
_interfaces_, a collection of abstract methods to be filled out by the | |
actual implementation. In the world of actors, protocols take the place | |
of interfaces. While it is not possible to formalize general protocols | |
in the programming language, we can compose their most basic element, | |
messages. So, we will start by identifying the messages we will want to | |
send to device actors. | |
Typically, messages fall into categories, or patterns. By identifying | |
these patterns, you will find that it becomes easier to choose between | |
them and to implement them. The first example demonstrates the | |
_request-respond_ message pattern. | |
Identifying messages for devices | |
The tasks of a device actor will be simple: | |
- Collect temperature measurements | |
- When asked, report the last measured temperature | |
However, a device might start without immediately having a temperature | |
measurement. Hence, we need to account for the case where a temperature | |
is not present. This also allows us to test the query part of the actor | |
without the write part present, as the device actor can simply report an | |
empty result. | |
The protocol for obtaining the current temperature from the device actor | |
is simple. The actor: | |
1. Waits for a request for the current temperature. | |
2. Responds to the request with a reply that either: | |
- contains the current temperature or, | |
- indicates that a temperature is not yet available. | |
We need two messages, one for the request, and one for the reply. Our | |
first attempt might look like the following: | |
Scala | |
final case object ReadTemperature | |
final case class RespondTemperature(value: Option[Double]) | |
Java | |
public static final class ReadTemperature { | |
} | |
public static final class RespondTemperature { | |
final Optional<Double> value; | |
public RespondTemperature(Optional<Double> value) { | |
this.value = value; | |
} | |
} | |
These two messages seem to cover the required functionality. However, | |
the approach we choose must take into account the distributed nature of | |
the application. While the basic mechanism is the same for communicating | |
with an actor on the local JVM as with a remote actor, we need to keep | |
the following in mind: | |
- There will be observable differences in the latency of delivery | |
between local and remote messages, because factors like network link | |
bandwidth and the message size also come into play. | |
- Reliability is a concern because a remote message send involves more | |
steps, which means that more can go wrong. | |
- A local send will just pass a reference to the message inside the | |
same JVM, without any restrictions on the underlying object which is | |
sent, whereas a remote transport will place a limit on the message | |
size. | |
In addition, while sending inside the same JVM is significantly more | |
reliable, if an actor fails due to a programmer error while processing | |
the message, the effect is basically the same as if a remote network | |
request fails due to the remote host crashing while processing the | |
message. Even though in both cases, the service recovers after a while | |
(the actor is restarted by its supervisor, the host is restarted by an | |
operator or by a monitoring system) individual requests are lost during | |
the crash. THEREFORE, WRITING YOUR ACTORS SUCH THAT EVERY MESSAGE COULD | |
POSSIBLY BE LOST IS THE SAFE, PESSIMISTIC BET. | |
But to further understand the need for flexibility in the protocol, it | |
will help to consider Akka message ordering and message delivery | |
guarantees. Akka provides the following behavior for message sends: | |
- At-most-once delivery, that is, no guaranteed delivery. | |
- Message ordering is maintained per sender, receiver pair. | |
The following sections discuss this behavior in more detail: | |
- Message delivery | |
- Message ordering | |
Message delivery | |
The delivery semantics provided by messaging subsystems typically fall | |
into the following categories: | |
- AT-MOST-ONCE DELIVERY — each message is delivered zero or one time; | |
in more causal terms it means that messages can be lost, but are | |
never duplicated. | |
- AT-LEAST-ONCE DELIVERY — potentially multiple attempts are made to | |
deliver each message, until at least one succeeds; again, in more | |
causal terms this means that messages can be duplicated but are | |
never lost. | |
- EXACTLY-ONCE DELIVERY — each message is delivered exactly once to | |
the recipient; the message can neither be lost nor be duplicated. | |
The first behavior, the one used by Akka, is the cheapest and results in | |
the highest performance. It has the least implementation overhead | |
because it can be done in a fire-and-forget fashion without keeping the | |
state at the sending end or in the transport mechanism. The second, | |
at-least-once, requires retries to counter transport losses. This adds | |
the overhead of keeping the state at the sending end and having an | |
acknowledgment mechanism at the receiving end. Exactly-once delivery is | |
most expensive, and results in the worst performance: in addition to the | |
overhead added by at-least-once delivery, it requires the state to be | |
kept at the receiving end in order to filter out duplicate deliveries. | |
In an actor system, we need to determine exact meaning of a guarantee — | |
at which point does the system consider the delivery as accomplished: | |
1. When the message is sent out on the network? | |
2. When the message is received by the target actor’s host? | |
3. When the message is put into the target actor’s mailbox? | |
4. When the message target actor starts to process the message? | |
5. When the target actor has successfully processed the message? | |
Most frameworks and protocols that claim guaranteed delivery actually | |
provide something similar to points 4 and 5. While this sounds | |
reasonable, IS IT ACTUALLY USEFUL? To understand the implications, | |
consider a simple, practical example: a user attempts to place an order | |
and we only want to claim that it has successfully processed once it is | |
actually on disk in the orders database. | |
If we rely on the successful processing of the message, the actor will | |
report success as soon as the order has been submitted to the internal | |
API that has the responsibility to validate it, process it and put it | |
into the database. Unfortunately, immediately after the API has been | |
invoked any the following can happen: | |
- The host can crash. | |
- Deserialization can fail. | |
- Validation can fail. | |
- The database might be unavailable. | |
- A programming error might occur. | |
This illustrates that the GUARANTEE OF DELIVERY does not translate to | |
the DOMAIN LEVEL GUARANTEE. We only want to report success once the | |
order has been actually fully processed and persisted. THE ONLY ENTITY | |
THAT CAN REPORT SUCCESS IS THE APPLICATION ITSELF, SINCE ONLY IT HAS ANY | |
UNDERSTANDING OF THE DOMAIN GUARANTEES REQUIRED. NO GENERALIZED | |
FRAMEWORK CAN FIGURE OUT THE SPECIFICS OF A PARTICULAR DOMAIN AND WHAT | |
IS CONSIDERED A SUCCESS IN THAT DOMAIN. | |
In this particular example, we only want to signal success after a | |
successful database write, where the database acknowledged that the | |
order is now safely stored. FOR THESE REASONS AKKA LIFTS THE | |
RESPONSIBILITIES OF GUARANTEES TO THE APPLICATION ITSELF, I.E. YOU HAVE | |
TO IMPLEMENT THEM YOURSELF. THIS GIVES YOU FULL CONTROL OF THE | |
GUARANTEES THAT YOU WANT TO PROVIDE. Now, let’s consider the message | |
ordering that Akka provides to make it easy to reason about application | |
logic. | |
Message Ordering | |
In Akka, for a given pair of actors, messages sent directly from the | |
first to the second will not be received out-of-order. The word directly | |
emphasizes that this guarantee only applies when sending with the tell | |
operator directly to the final destination, but not when employing | |
mediators. | |
If: | |
- Actor A1 sends messages M1, M2, M3 to A2. | |
- Actor A3 sends messages M4, M5, M6 to A2. | |
This means that, for Akka messages: | |
- If M1 is delivered it must be delivered before M2 and M3. | |
- If M2 is delivered it must be delivered before M3. | |
- If M4 is delivered it must be delivered before M5 and M6. | |
- If M5 is delivered it must be delivered before M6. | |
- A2 can see messages from A1 interleaved with messages from A3. | |
- Since there is no guaranteed delivery, any of the messages may be | |
dropped, i.e. not arrive at A2. | |
These guarantees strike a good balance: having messages from one actor | |
arrive in-order is convenient for building systems that can be easily | |
reasoned about, while on the other hand allowing messages from different | |
actors to arrive interleaved provides sufficient freedom for an | |
efficient implementation of the actor system. | |
For the full details on delivery guarantees please refer to the | |
reference page. | |
Adding flexibility to device messages | |
Our first query protocol was correct, but did not take into account | |
distributed application execution. If we want to implement resends in | |
the actor that queries a device actor (because of timed out requests), | |
or if we want to query multiple actors, we need to be able to correlate | |
requests and responses. Hence, we add one more field to our messages, so | |
that an ID can be provided by the requester (we will add this code to | |
our app in a later step): | |
Scala | |
final case class ReadTemperature(requestId: Long) | |
final case class RespondTemperature(requestId: Long, value: Option[Double]) | |
Java | |
public static final class ReadTemperature { | |
long requestId; | |
public ReadTemperature(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class RespondTemperature { | |
long requestId; | |
Optional<Double> value; | |
public RespondTemperature(long requestId, Optional<Double> value) { | |
this.requestId = requestId; | |
this.value = value; | |
} | |
} | |
Defining the device actor and its read protocol | |
As we learned in the Hello World example, each actor defines the type of | |
messages it will accept. Our device actor has the responsibility to use | |
the same ID parameter for the response of a given query, which would | |
make it look like the following. | |
Scala | |
import akka.actor.{ Actor, ActorLogging, Props } | |
object Device { | |
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) | |
final case class ReadTemperature(requestId: Long) | |
final case class RespondTemperature(requestId: Long, value: Option[Double]) | |
} | |
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { | |
import Device._ | |
var lastTemperatureReading: Option[Double] = None | |
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) | |
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) | |
override def receive: Receive = { | |
case ReadTemperature(id) => | |
sender() ! RespondTemperature(id, lastTemperatureReading) | |
} | |
} | |
Java | |
import java.util.Optional; | |
import akka.actor.AbstractActor; | |
import akka.actor.Props; | |
import akka.event.Logging; | |
import akka.event.LoggingAdapter; | |
class Device extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final String groupId; | |
final String deviceId; | |
public Device(String groupId, String deviceId) { | |
this.groupId = groupId; | |
this.deviceId = deviceId; | |
} | |
public static Props props(String groupId, String deviceId) { | |
return Props.create(Device.class, groupId, deviceId); | |
} | |
public static final class ReadTemperature { | |
long requestId; | |
public ReadTemperature(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class RespondTemperature { | |
long requestId; | |
Optional<Double> value; | |
public RespondTemperature(long requestId, Optional<Double> value) { | |
this.requestId = requestId; | |
this.value = value; | |
} | |
} | |
Optional<Double> lastTemperatureReading = Optional.empty(); | |
@Override | |
public void preStart() { | |
log.info("Device actor {}-{} started", groupId, deviceId); | |
} | |
@Override | |
public void postStop() { | |
log.info("Device actor {}-{} stopped", groupId, deviceId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(ReadTemperature.class, r -> { | |
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); | |
}) | |
.build(); | |
} | |
} | |
Note in the code that: | |
- The companion objectstatic method defines how to construct a Device | |
actor. The props parameters include an ID for the device and the | |
group to which it belongs, which we will use later. | |
- The companion objectclass includes the definitions of the messages | |
we reasoned about previously. | |
- In the Device class, the value of lastTemperatureReading is | |
initially set to NoneOptional.empty(), and the actor will simply | |
report it back if queried. | |
Testing the actor | |
Based on the simple actor above, we could write a simple test. In the | |
com.lightbend.akka.sample package in the test tree of your project, add | |
the following code to a DeviceSpec.scalaDeviceTest.java file. (We use | |
ScalaTest but any other test framework can be used with the Akka | |
Testkit). | |
You can run this test by running mvn test or by running test at the sbt | |
prompt. | |
Scala | |
"reply with empty reading if no temperature is known" in { | |
val probe = TestProbe() | |
val deviceActor = system.actorOf(Device.props("group", "device")) | |
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref) | |
val response = probe.expectMsgType[Device.RespondTemperature] | |
response.requestId should ===(42) | |
response.value should ===(None) | |
} | |
Java | |
@Test | |
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() { | |
TestKit probe = new TestKit(system); | |
ActorRef deviceActor = system.actorOf(Device.props("group", "device")); | |
deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef()); | |
Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class); | |
assertEquals(42L, response.requestId); | |
assertEquals(Optional.empty(), response.value); | |
} | |
Now, the actor needs a way to change the state of the temperature when | |
it receives a message from the sensor. | |
Adding a write protocol | |
The purpose of the write protocol is to update the currentTemperature | |
field when the actor receives a message that contains the temperature. | |
Again, it is tempting to define the write protocol as a very simple | |
message, something like this: | |
Scala | |
final case class RecordTemperature(value: Double) | |
Java | |
public static final class RecordTemperature { | |
final double value; | |
public RecordTemperature(double value) { | |
this.value = value; | |
} | |
} | |
However, this approach does not take into account that the sender of the | |
record temperature message can never be sure if the message was | |
processed or not. We have seen that Akka does not guarantee delivery of | |
these messages and leaves it to the application to provide success | |
notifications. In our case, we would like to send an acknowledgment to | |
the sender once we have updated our last temperature recording, e.g. | |
final case class TemperatureRecorded(requestId: Long)TemperatureRecorded. | |
Just like in the case of temperature queries and responses, it is a good | |
idea to include an ID field to provide maximum flexibility. | |
Actor with read and write messages | |
Putting the read and write protocol together, the device actor looks | |
like the following example: | |
Scala | |
import akka.actor.{ Actor, ActorLogging, Props } | |
object Device { | |
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) | |
final case class RecordTemperature(requestId: Long, value: Double) | |
final case class TemperatureRecorded(requestId: Long) | |
final case class ReadTemperature(requestId: Long) | |
final case class RespondTemperature(requestId: Long, value: Option[Double]) | |
} | |
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { | |
import Device._ | |
var lastTemperatureReading: Option[Double] = None | |
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) | |
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) | |
override def receive: Receive = { | |
case RecordTemperature(id, value) => | |
log.info("Recorded temperature reading {} with {}", value, id) | |
lastTemperatureReading = Some(value) | |
sender() ! TemperatureRecorded(id) | |
case ReadTemperature(id) => | |
sender() ! RespondTemperature(id, lastTemperatureReading) | |
} | |
} | |
Java | |
import java.util.Optional; | |
import akka.actor.AbstractActor; | |
import akka.actor.AbstractActor.Receive; | |
import akka.actor.Props; | |
import akka.event.Logging; | |
import akka.event.LoggingAdapter; | |
public class Device extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final String groupId; | |
final String deviceId; | |
public Device(String groupId, String deviceId) { | |
this.groupId = groupId; | |
this.deviceId = deviceId; | |
} | |
public static Props props(String groupId, String deviceId) { | |
return Props.create(Device.class, groupId, deviceId); | |
} | |
public static final class RecordTemperature { | |
final long requestId; | |
final double value; | |
public RecordTemperature(long requestId, double value) { | |
this.requestId = requestId; | |
this.value = value; | |
} | |
} | |
public static final class TemperatureRecorded { | |
final long requestId; | |
public TemperatureRecorded(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class ReadTemperature { | |
final long requestId; | |
public ReadTemperature(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class RespondTemperature { | |
final long requestId; | |
final Optional<Double> value; | |
public RespondTemperature(long requestId, Optional<Double> value) { | |
this.requestId = requestId; | |
this.value = value; | |
} | |
} | |
Optional<Double> lastTemperatureReading = Optional.empty(); | |
@Override | |
public void preStart() { | |
log.info("Device actor {}-{} started", groupId, deviceId); | |
} | |
@Override | |
public void postStop() { | |
log.info("Device actor {}-{} stopped", groupId, deviceId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(RecordTemperature.class, r -> { | |
log.info("Recorded temperature reading {} with {}", r.value, r.requestId); | |
lastTemperatureReading = Optional.of(r.value); | |
getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); | |
}) | |
.match(ReadTemperature.class, r -> { | |
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); | |
}) | |
.build(); | |
} | |
} | |
We should also write a new test case now, exercising both the read/query | |
and write/record functionality together: | |
Scala | |
"reply with latest temperature reading" in { | |
val probe = TestProbe() | |
val deviceActor = system.actorOf(Device.props("group", "device")) | |
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref) | |
probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) | |
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref) | |
val response1 = probe.expectMsgType[Device.RespondTemperature] | |
response1.requestId should ===(2) | |
response1.value should ===(Some(24.0)) | |
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref) | |
probe.expectMsg(Device.TemperatureRecorded(requestId = 3)) | |
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref) | |
val response2 = probe.expectMsgType[Device.RespondTemperature] | |
response2.requestId should ===(4) | |
response2.value should ===(Some(55.0)) | |
} | |
Java | |
@Test | |
public void testReplyWithLatestTemperatureReading() { | |
TestKit probe = new TestKit(system); | |
ActorRef deviceActor = system.actorOf(Device.props("group", "device")); | |
deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef()); | |
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); | |
deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef()); | |
Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class); | |
assertEquals(2L, response1.requestId); | |
assertEquals(Optional.of(24.0), response1.value); | |
deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef()); | |
assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); | |
deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef()); | |
Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class); | |
assertEquals(4L, response2.requestId); | |
assertEquals(Optional.of(55.0), response2.value); | |
} | |
What’s Next? | |
So far, we have started designing our overall architecture, and we wrote | |
the first actor that directly corresponds to the domain. We now have to | |
create the component that is responsible for maintaining groups of | |
devices and the device actors themselves. | |
PART 4: WORKING WITH DEVICE GROUPS | |
Let’s take a closer look at the main functionality required by our use | |
case. In a complete IoT system for monitoring home temperatures, the | |
steps for connecting a device sensor to our system might look like this: | |
1. A sensor device in the home connects through some protocol. | |
2. The component managing network connections accepts the connection. | |
3. The sensor provides its group and device ID to register with the | |
device manager component of our system. | |
4. The device manager component handles registration by looking up or | |
creating the actor responsible for keeping sensor state. | |
5. The actor responds with an acknowledgement, exposing its ActorRef. | |
6. The networking component now uses the ActorRef for communication | |
between the sensor and device actor without going through the device | |
manager. | |
Steps 1 and 2 take place outside the boundaries of our tutorial system. | |
In this chapter, we will start addressing steps 3-6 and create a way for | |
sensors to register with our system and to communicate with actors. But | |
first, we have another architectural decision — how many levels of | |
actors should we use to represent device groups and device sensors? | |
One of the main design challenges for Akka programmers is choosing the | |
best granularity for actors. In practice, depending on the | |
characteristics of the interactions between actors, there are usually | |
several valid ways to organize a system. In our use case, for example, | |
it would be possible to have a single actor maintain all the groups and | |
devices — perhaps using hash maps. It would also be reasonable to have | |
an actor for each group that tracks the state of all devices in the same | |
home. | |
The following guidelines help us choose the most appropriate actor | |
hierarchy: | |
- In general, prefer larger granularity. Introducing more fine-grained | |
actors than needed causes more problems than it solves. | |
- Add finer granularity when the system requires: | |
- Higher concurrency. | |
- Complex conversations between actors that have many states. We | |
will see a very good example for this in the next chapter. | |
- Sufficient state that it makes sense to divide into smaller | |
actors. | |
- Multiple unrelated responsibilities. Using separate actors | |
allows individuals to fail and be restored with little impact on | |
others. | |
Device manager hierarchy | |
Considering the principles outlined in the previous section, We will | |
model the device manager component as an actor tree with three levels: | |
- The top level supervisor actor represents the system component for | |
devices. It is also the entry point to look up and create device | |
group and device actors. | |
- At the next level, group actors each supervise the device actors for | |
one group id (e.g. one home). They also provide services, such as | |
querying temperature readings from all of the available devices in | |
their group. | |
- Device actors manage all the interactions with the actual device | |
sensors, such as storing temperature readings. | |
[device manager tree] | |
We chose this three-layered architecture for these reasons: | |
- Having groups of individual actors: | |
- Isolates failures that occur in a group. If a single actor | |
managed all device groups, an error in one group that causes a | |
restart would wipe out the state of groups that are otherwise | |
non-faulty. | |
- Simplifies the problem of querying all the devices belonging to | |
a group. Each group actor only contains state related to its | |
group. | |
- Increases parallelism in the system. Since each group has a | |
dedicated actor, they run concurrently and we can query multiple | |
groups concurrently. | |
- Having sensors modeled as individual device actors: | |
- Isolates failures of one device actor from the rest of the | |
devices in the group. | |
- Increases the parallelism of collecting temperature readings. | |
Network connections from different sensors communicate with | |
their individual device actors directly, reducing contention | |
points. | |
With the architecture defined, we can start working on the protocol for | |
registering sensors. | |
The Registration Protocol | |
As the first step, we need to design the protocol both for registering a | |
device and for creating the group and device actors that will be | |
responsible for it. This protocol will be provided by the DeviceManager | |
component itself because that is the only actor that is known and | |
available up front: device groups and device actors are created | |
on-demand. | |
Looking at registration in more detail, we can outline the necessary | |
functionality: | |
1. When a DeviceManager receives a request with a group and device id: | |
- If the manager already has an actor for the device group, it | |
forwards the request to it. | |
- Otherwise, it creates a new device group actor and then forwards | |
the request. | |
2. The DeviceGroup actor receives the request to register an actor for | |
the given device: | |
- If the group already has an actor for the device, the group | |
actor forwards the request to the device actor. | |
- Otherwise, the DeviceGroup actor first creates a device actor | |
and then forwards the request. | |
3. The device actor receives the request and sends an acknowledgement | |
to the original sender. Since the device actor acknowledges receipt | |
(instead of the group actor), the sensor will now have the ActorRef | |
to send messages directly to its actor. | |
The messages that we will use to communicate registration requests and | |
their acknowledgement have a simple definition: | |
Scala | |
final case class RequestTrackDevice(groupId: String, deviceId: String) | |
case object DeviceRegistered | |
Java | |
public static final class RequestTrackDevice { | |
public final String groupId; | |
public final String deviceId; | |
public RequestTrackDevice(String groupId, String deviceId) { | |
this.groupId = groupId; | |
this.deviceId = deviceId; | |
} | |
} | |
public static final class DeviceRegistered { | |
} | |
In this case we have not included a request ID field in the messages. | |
Since registration happens once, when the component connects the system | |
to some network protocol, the ID is not important. However, it is | |
usually a best practice to include a request ID. | |
Now, we’ll start implementing the protocol from the bottom up. In | |
practice, both a top-down and bottom-up approach can work, but in our | |
case, we benefit from the bottom-up approach as it allows us to | |
immediately write tests for the new features without mocking out parts | |
that we will need to build later. | |
Adding registration support to device actors | |
At the bottom of our hierarchy are the Device actors. Their job in the | |
registration process is simple: reply to the registration request with | |
an acknowledgment to the sender. It is also prudent to add a safeguard | |
against requests that come with a mismatched group or device ID. | |
_We will assume that the ID of the sender of the registration message is | |
preserved in the upper layers._ We will show you in the next section how | |
this can be achieved. | |
The device actor registration code looks like the following. Modify your | |
example to match. | |
Scala | |
object Device { | |
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) | |
final case class RecordTemperature(requestId: Long, value: Double) | |
final case class TemperatureRecorded(requestId: Long) | |
final case class ReadTemperature(requestId: Long) | |
final case class RespondTemperature(requestId: Long, value: Option[Double]) | |
} | |
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { | |
import Device._ | |
var lastTemperatureReading: Option[Double] = None | |
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) | |
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) | |
override def receive: Receive = { | |
case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) => | |
sender() ! DeviceManager.DeviceRegistered | |
case DeviceManager.RequestTrackDevice(groupId, deviceId) => | |
log.warning( | |
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", | |
groupId, deviceId, this.groupId, this.deviceId | |
) | |
case RecordTemperature(id, value) => | |
log.info("Recorded temperature reading {} with {}", value, id) | |
lastTemperatureReading = Some(value) | |
sender() ! TemperatureRecorded(id) | |
case ReadTemperature(id) => | |
sender() ! RespondTemperature(id, lastTemperatureReading) | |
} | |
} | |
Java | |
import akka.actor.AbstractActor; | |
import akka.actor.Props; | |
import akka.event.Logging; | |
import akka.event.LoggingAdapter; | |
import jdocs.tutorial_4.DeviceManager.DeviceRegistered; | |
import jdocs.tutorial_4.DeviceManager.RequestTrackDevice; | |
import java.util.Optional; | |
public class Device extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final String groupId; | |
final String deviceId; | |
public Device(String groupId, String deviceId) { | |
this.groupId = groupId; | |
this.deviceId = deviceId; | |
} | |
public static Props props(String groupId, String deviceId) { | |
return Props.create(Device.class, groupId, deviceId); | |
} | |
public static final class RecordTemperature { | |
final long requestId; | |
final double value; | |
public RecordTemperature(long requestId, double value) { | |
this.requestId = requestId; | |
this.value = value; | |
} | |
} | |
public static final class TemperatureRecorded { | |
final long requestId; | |
public TemperatureRecorded(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class ReadTemperature { | |
final long requestId; | |
public ReadTemperature(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class RespondTemperature { | |
final long requestId; | |
final Optional<Double> value; | |
public RespondTemperature(long requestId, Optional<Double> value) { | |
this.requestId = requestId; | |
this.value = value; | |
} | |
} | |
Optional<Double> lastTemperatureReading = Optional.empty(); | |
@Override | |
public void preStart() { | |
log.info("Device actor {}-{} started", groupId, deviceId); | |
} | |
@Override | |
public void postStop() { | |
log.info("Device actor {}-{} stopped", groupId, deviceId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(RequestTrackDevice.class, r -> { | |
if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { | |
getSender().tell(new DeviceRegistered(), getSelf()); | |
} else { | |
log.warning( | |
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", | |
r.groupId, r.deviceId, this.groupId, this.deviceId | |
); | |
} | |
}) | |
.match(RecordTemperature.class, r -> { | |
log.info("Recorded temperature reading {} with {}", r.value, r.requestId); | |
lastTemperatureReading = Optional.of(r.value); | |
getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); | |
}) | |
.match(ReadTemperature.class, r -> { | |
getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); | |
}) | |
.build(); | |
} | |
} | |
Note | |
We used a feature of scala pattern matching where we can check to see if | |
a certain field equals an expected value. By bracketing variables with | |
backticks, like `variable`, the pattern will only match if it contains | |
the value of variable in that position. | |
We can now write two new test cases, one exercising successful | |
registration, the other testing the case when IDs don’t match: | |
Scala | |
"reply to registration requests" in { | |
val probe = TestProbe() | |
val deviceActor = system.actorOf(Device.props("group", "device")) | |
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
probe.lastSender should ===(deviceActor) | |
} | |
"ignore wrong registration requests" in { | |
val probe = TestProbe() | |
val deviceActor = system.actorOf(Device.props("group", "device")) | |
deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref) | |
probe.expectNoMsg(500.milliseconds) | |
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref) | |
probe.expectNoMsg(500.milliseconds) | |
} | |
Java | |
@Test | |
public void testReplyToRegistrationRequests() { | |
TestKit probe = new TestKit(system); | |
ActorRef deviceActor = system.actorOf(Device.props("group", "device")); | |
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
assertEquals(deviceActor, probe.getLastSender()); | |
} | |
@Test | |
public void testIgnoreWrongRegistrationRequests() { | |
TestKit probe = new TestKit(system); | |
ActorRef deviceActor = system.actorOf(Device.props("group", "device")); | |
deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef()); | |
probe.expectNoMsg(); | |
deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef()); | |
probe.expectNoMsg(); | |
} | |
Note | |
We used the expectNoMsg() helper method from TestProbeTestKit. This | |
assertion waits until the defined time-limit and fails if it receives | |
any messages during this period. If no messages are received during the | |
waiting period, the assertion passes. It is usually a good idea to keep | |
these timeouts low (but not too low) because they add significant test | |
execution time. | |
Adding registration support to device group actors | |
We are done with registration support at the device level, now we have | |
to implement it at the group level. A group actor has more work to do | |
when it comes to registrations, including: | |
- Handling the registration request by either forwarding it to an | |
existing device actor or by creating a new actor and forwarding the | |
message. | |
- Tracking which device actors exist in the group and removing them | |
from the group when they are stopped. | |
Handling the registration request | |
A device group actor must either forward the request to an existing | |
child, or it should create one. To look up child actors by their device | |
IDs we will use a Map[String, ActorRef]Map<String, ActorRef>. | |
We also want to keep the the ID of the original sender of the request so | |
that our device actor can reply directly. This is possible by using | |
forward instead of the ! tell operator. The only difference between the | |
two is that forward keeps the original sender while ! tell sets the | |
sender to be the current actor. Just like with our device actor, we | |
ensure that we don’t respond to wrong group IDs. Add the following to | |
your source file: | |
Scala | |
object DeviceGroup { | |
def props(groupId: String): Props = Props(new DeviceGroup(groupId)) | |
} | |
class DeviceGroup(groupId: String) extends Actor with ActorLogging { | |
var deviceIdToActor = Map.empty[String, ActorRef] | |
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) | |
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) | |
override def receive: Receive = { | |
case trackMsg @ RequestTrackDevice(`groupId`, _) => | |
deviceIdToActor.get(trackMsg.deviceId) match { | |
case Some(deviceActor) => | |
deviceActor forward trackMsg | |
case None => | |
log.info("Creating device actor for {}", trackMsg.deviceId) | |
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") | |
deviceIdToActor += trackMsg.deviceId -> deviceActor | |
deviceActor forward trackMsg | |
} | |
case RequestTrackDevice(groupId, deviceId) => | |
log.warning( | |
"Ignoring TrackDevice request for {}. This actor is responsible for {}.", | |
groupId, this.groupId | |
) | |
} | |
} | |
Java | |
public static Props props(String groupId) { | |
return Props.create(DeviceGroup.class, groupId); | |
} | |
final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); | |
@Override | |
public void preStart() { | |
log.info("DeviceGroup {} started", groupId); | |
} | |
@Override | |
public void postStop() { | |
log.info("DeviceGroup {} stopped", groupId); | |
} | |
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { | |
if (this.groupId.equals(trackMsg.groupId)) { | |
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); | |
if (deviceActor != null) { | |
deviceActor.forward(trackMsg, getContext()); | |
} else { | |
log.info("Creating device actor for {}", trackMsg.deviceId); | |
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); | |
deviceIdToActor.put(trackMsg.deviceId, deviceActor); | |
deviceActor.forward(trackMsg, getContext()); | |
} | |
} else { | |
log.warning( | |
"Ignoring TrackDevice request for {}. This actor is responsible for {}.", | |
groupId, this.groupId | |
); | |
} | |
} | |
private void onDeviceList(RequestDeviceList r) { | |
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); | |
} | |
private void onTerminated(Terminated t) { | |
ActorRef deviceActor = t.getActor(); | |
String deviceId = actorToDeviceId.get(deviceActor); | |
log.info("Device actor for {} has been terminated", deviceId); | |
actorToDeviceId.remove(deviceActor); | |
deviceIdToActor.remove(deviceId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) | |
.match(RequestDeviceList.class, this::onDeviceList) | |
.match(Terminated.class, this::onTerminated) | |
.build(); | |
} | |
} | |
Just as we did with the device, we test this new functionality. We also | |
test that the actors returned for the two different IDs are actually | |
different, and we also attempt to record a temperature reading for each | |
of the devices to see if the actors are responding. | |
Scala | |
"be able to register a device actor" in { | |
val probe = TestProbe() | |
val groupActor = system.actorOf(DeviceGroup.props("group")) | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor1 = probe.lastSender | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor2 = probe.lastSender | |
deviceActor1 should !==(deviceActor2) | |
// Check that the device actors are working | |
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) | |
probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) | |
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) | |
probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) | |
} | |
"ignore requests for wrong groupId" in { | |
val probe = TestProbe() | |
val groupActor = system.actorOf(DeviceGroup.props("group")) | |
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) | |
probe.expectNoMsg(500.milliseconds) | |
} | |
Java | |
@Test | |
public void testRegisterDeviceActor() { | |
TestKit probe = new TestKit(system); | |
ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor1 = probe.getLastSender(); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor2 = probe.getLastSender(); | |
assertNotEquals(deviceActor1, deviceActor2); | |
// Check that the device actors are working | |
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); | |
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); | |
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); | |
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); | |
} | |
@Test | |
public void testIgnoreRequestsForWrongGroupId() { | |
TestKit probe = new TestKit(system); | |
ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef()); | |
probe.expectNoMsg(); | |
} | |
If a device actor already exists for the registration request, we would | |
like to use the existing actor instead of a new one. We have not tested | |
this yet, so we need to fix this: | |
Scala | |
"return same actor for same deviceId" in { | |
val probe = TestProbe() | |
val groupActor = system.actorOf(DeviceGroup.props("group")) | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor1 = probe.lastSender | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor2 = probe.lastSender | |
deviceActor1 should ===(deviceActor2) | |
} | |
Java | |
@Test | |
public void testReturnSameActorForSameDeviceId() { | |
TestKit probe = new TestKit(system); | |
ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor1 = probe.getLastSender(); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor2 = probe.getLastSender(); | |
assertEquals(deviceActor1, deviceActor2); | |
} | |
Keeping track of the device actors in the group | |
So far, we have implemented logic for registering device actors in the | |
group. Devices come and go, however, so we will need a way to remove | |
device actors from the Map[String, ActorRef] Map<String, ActorRef>. We | |
will assume that when a device is removed, its corresponding device | |
actor is simply stopped. Supervision, as we discussed earlier, only | |
handles error scenarios — not graceful stopping. So we need to notify | |
the parent when one of the device actors is stopped. | |
Akka provides a _Death Watch_ feature that allows an actor to _watch_ | |
another actor and be notified if the other actor is stopped. Unlike | |
supervision, watching is not limited to parent-child relationships, any | |
actor can watch any other actor as long as it knows the ActorRef. After | |
a watched actor stops, the watcher receives a Terminated(actorRef) | |
message which also contains the reference to the watched actor. The | |
watcher can either handle this message explicitly or will fail with a | |
DeathPactException. This latter is useful if the actor can no longer | |
perform its own duties after the watched actor has been stopped. In our | |
case, the group should still function after one device have been | |
stopped, so we need to handle the Terminated(actorRef) message. | |
Our device group actor needs to include functionality that: | |
1. Starts watching new device actors when they are created. | |
2. Removes a device actor from the Map[String, ActorRef] | |
Map<String, ActorRef> — which maps devices to device actors — when | |
the notification indicates it has stopped. | |
Unfortunately, the Terminated message only contains the ActorRef of the | |
child actor. We need the actor’s ID to remove it from the map of | |
existing device to device actor mappings. To be able to do this removal, | |
we need to introduce another placeholder, Map[String, ActorRef] | |
Map<String, ActorRef>, that allow us to find out the device ID | |
corresponding to a given ActorRef. | |
Adding the functionality to identify the actor results in this: | |
Scala | |
class DeviceGroup(groupId: String) extends Actor with ActorLogging { | |
var deviceIdToActor = Map.empty[String, ActorRef] | |
var actorToDeviceId = Map.empty[ActorRef, String] | |
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) | |
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) | |
override def receive: Receive = { | |
case trackMsg @ RequestTrackDevice(`groupId`, _) => | |
deviceIdToActor.get(trackMsg.deviceId) match { | |
case Some(deviceActor) => | |
deviceActor forward trackMsg | |
case None => | |
log.info("Creating device actor for {}", trackMsg.deviceId) | |
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") | |
context.watch(deviceActor) | |
actorToDeviceId += deviceActor -> trackMsg.deviceId | |
deviceIdToActor += trackMsg.deviceId -> deviceActor | |
deviceActor forward trackMsg | |
} | |
case RequestTrackDevice(groupId, deviceId) => | |
log.warning( | |
"Ignoring TrackDevice request for {}. This actor is responsible for {}.", | |
groupId, this.groupId | |
) | |
case Terminated(deviceActor) => | |
val deviceId = actorToDeviceId(deviceActor) | |
log.info("Device actor for {} has been terminated", deviceId) | |
actorToDeviceId -= deviceActor | |
deviceIdToActor -= deviceId | |
} | |
} | |
Java | |
final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); | |
final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
@Override | |
public void preStart() { | |
log.info("DeviceGroup {} started", groupId); | |
} | |
@Override | |
public void postStop() { | |
log.info("DeviceGroup {} stopped", groupId); | |
} | |
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { | |
if (this.groupId.equals(trackMsg.groupId)) { | |
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); | |
if (deviceActor != null) { | |
deviceActor.forward(trackMsg, getContext()); | |
} else { | |
log.info("Creating device actor for {}", trackMsg.deviceId); | |
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); | |
getContext().watch(deviceActor); | |
actorToDeviceId.put(deviceActor, trackMsg.deviceId); | |
deviceIdToActor.put(trackMsg.deviceId, deviceActor); | |
deviceActor.forward(trackMsg, getContext()); | |
} | |
} else { | |
log.warning( | |
"Ignoring TrackDevice request for {}. This actor is responsible for {}.", | |
groupId, this.groupId | |
); | |
} | |
} | |
private void onDeviceList(RequestDeviceList r) { | |
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); | |
} | |
private void onTerminated(Terminated t) { | |
ActorRef deviceActor = t.getActor(); | |
String deviceId = actorToDeviceId.get(deviceActor); | |
log.info("Device actor for {} has been terminated", deviceId); | |
actorToDeviceId.remove(deviceActor); | |
deviceIdToActor.remove(deviceId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) | |
.match(RequestDeviceList.class, this::onDeviceList) | |
.match(Terminated.class, this::onTerminated) | |
.build(); | |
} | |
} | |
So far we have no means to get which devices the group device actor | |
keeps track of and, therefore, we cannot test our new functionality yet. | |
To make it testable, we add a new query capability (message | |
RequestDeviceList(requestId: Long) RequestDeviceList) that simply lists | |
the currently active device IDs: | |
Scala | |
object DeviceGroup { | |
def props(groupId: String): Props = Props(new DeviceGroup(groupId)) | |
final case class RequestDeviceList(requestId: Long) | |
final case class ReplyDeviceList(requestId: Long, ids: Set[String]) | |
} | |
class DeviceGroup(groupId: String) extends Actor with ActorLogging { | |
var deviceIdToActor = Map.empty[String, ActorRef] | |
var actorToDeviceId = Map.empty[ActorRef, String] | |
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) | |
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) | |
override def receive: Receive = { | |
case trackMsg @ RequestTrackDevice(`groupId`, _) => | |
deviceIdToActor.get(trackMsg.deviceId) match { | |
case Some(deviceActor) => | |
deviceActor forward trackMsg | |
case None => | |
log.info("Creating device actor for {}", trackMsg.deviceId) | |
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") | |
context.watch(deviceActor) | |
actorToDeviceId += deviceActor -> trackMsg.deviceId | |
deviceIdToActor += trackMsg.deviceId -> deviceActor | |
deviceActor forward trackMsg | |
} | |
case RequestTrackDevice(groupId, deviceId) => | |
log.warning( | |
"Ignoring TrackDevice request for {}. This actor is responsible for {}.", | |
groupId, this.groupId | |
) | |
case RequestDeviceList(requestId) => | |
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) | |
case Terminated(deviceActor) => | |
val deviceId = actorToDeviceId(deviceActor) | |
log.info("Device actor for {} has been terminated", deviceId) | |
actorToDeviceId -= deviceActor | |
deviceIdToActor -= deviceId | |
} | |
} | |
Java | |
public class DeviceGroup extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final String groupId; | |
public DeviceGroup(String groupId) { | |
this.groupId = groupId; | |
} | |
public static Props props(String groupId) { | |
return Props.create(DeviceGroup.class, groupId); | |
} | |
public static final class RequestDeviceList { | |
final long requestId; | |
public RequestDeviceList(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class ReplyDeviceList { | |
final long requestId; | |
final Set<String> ids; | |
public ReplyDeviceList(long requestId, Set<String> ids) { | |
this.requestId = requestId; | |
this.ids = ids; | |
} | |
} | |
final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); | |
final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
@Override | |
public void preStart() { | |
log.info("DeviceGroup {} started", groupId); | |
} | |
@Override | |
public void postStop() { | |
log.info("DeviceGroup {} stopped", groupId); | |
} | |
private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { | |
if (this.groupId.equals(trackMsg.groupId)) { | |
ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); | |
if (deviceActor != null) { | |
deviceActor.forward(trackMsg, getContext()); | |
} else { | |
log.info("Creating device actor for {}", trackMsg.deviceId); | |
deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); | |
getContext().watch(deviceActor); | |
actorToDeviceId.put(deviceActor, trackMsg.deviceId); | |
deviceIdToActor.put(trackMsg.deviceId, deviceActor); | |
deviceActor.forward(trackMsg, getContext()); | |
} | |
} else { | |
log.warning( | |
"Ignoring TrackDevice request for {}. This actor is responsible for {}.", | |
groupId, this.groupId | |
); | |
} | |
} | |
private void onDeviceList(RequestDeviceList r) { | |
getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); | |
} | |
private void onTerminated(Terminated t) { | |
ActorRef deviceActor = t.getActor(); | |
String deviceId = actorToDeviceId.get(deviceActor); | |
log.info("Device actor for {} has been terminated", deviceId); | |
actorToDeviceId.remove(deviceActor); | |
deviceIdToActor.remove(deviceId); | |
} | |
@Override | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) | |
.match(RequestDeviceList.class, this::onDeviceList) | |
.match(Terminated.class, this::onTerminated) | |
.build(); | |
} | |
} | |
We are almost ready to test the removal of devices. But, we still need | |
the following capabilities: | |
- To stop a device actor from our test case. From the outside, any | |
actor can be stopped by simply sending a special the built-in | |
message, PoisonPill, which instructs the actor to stop. | |
- To be notified once the device actor is stopped. We can use the | |
_Death Watch_ facility for this purpose, too. The TestProbe TestKit | |
has two messages that we can easily use, watch() to watch a specific | |
actor, and expectTerminated to assert that the watched actor has | |
been terminated. | |
We add two more test cases now. In the first, we just test that we get | |
back the list of proper IDs once we have added a few devices. The second | |
test case makes sure that the device ID is properly removed after the | |
device actor has been stopped: | |
Scala | |
"be able to list active devices" in { | |
val probe = TestProbe() | |
val groupActor = system.actorOf(DeviceGroup.props("group")) | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) | |
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) | |
} | |
"be able to list active devices after one shuts down" in { | |
val probe = TestProbe() | |
val groupActor = system.actorOf(DeviceGroup.props("group")) | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val toShutDown = probe.lastSender | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) | |
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) | |
probe.watch(toShutDown) | |
toShutDown ! PoisonPill | |
probe.expectTerminated(toShutDown) | |
// using awaitAssert to retry because it might take longer for the groupActor | |
// to see the Terminated, that order is undefined | |
probe.awaitAssert { | |
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) | |
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) | |
} | |
} | |
Java | |
@Test | |
public void testListActiveDevices() { | |
TestKit probe = new TestKit(system); | |
ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); | |
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); | |
assertEquals(0L, reply.requestId); | |
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); | |
} | |
@Test | |
public void testListActiveDevicesAfterOneShutsDown() { | |
TestKit probe = new TestKit(system); | |
ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef toShutDown = probe.getLastSender(); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); | |
DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); | |
assertEquals(0L, reply.requestId); | |
assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); | |
probe.watch(toShutDown); | |
toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender()); | |
probe.expectTerminated(toShutDown); | |
// using awaitAssert to retry because it might take longer for the groupActor | |
// to see the Terminated, that order is undefined | |
probe.awaitAssert(() -> { | |
groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef()); | |
DeviceGroup.ReplyDeviceList r = | |
probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); | |
assertEquals(1L, r.requestId); | |
assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids); | |
return null; | |
}); | |
} | |
Creating device manager actors | |
Going up to the next level in our hierarchy, we need to create the entry | |
point for our device manager component in the DeviceManager source file. | |
This actor is very similar to the device group actor, but creates device | |
group actors instead of device actors: | |
Scala | |
object DeviceManager { | |
def props(): Props = Props(new DeviceManager) | |
final case class RequestTrackDevice(groupId: String, deviceId: String) | |
case object DeviceRegistered | |
} | |
class DeviceManager extends Actor with ActorLogging { | |
var groupIdToActor = Map.empty[String, ActorRef] | |
var actorToGroupId = Map.empty[ActorRef, String] | |
override def preStart(): Unit = log.info("DeviceManager started") | |
override def postStop(): Unit = log.info("DeviceManager stopped") | |
override def receive = { | |
case trackMsg @ RequestTrackDevice(groupId, _) => | |
groupIdToActor.get(groupId) match { | |
case Some(ref) => | |
ref forward trackMsg | |
case None => | |
log.info("Creating device group actor for {}", groupId) | |
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId) | |
context.watch(groupActor) | |
groupActor forward trackMsg | |
groupIdToActor += groupId -> groupActor | |
actorToGroupId += groupActor -> groupId | |
} | |
case Terminated(groupActor) => | |
val groupId = actorToGroupId(groupActor) | |
log.info("Device group actor for {} has been terminated", groupId) | |
actorToGroupId -= groupActor | |
groupIdToActor -= groupId | |
} | |
} | |
Java | |
public class DeviceManager extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
public static Props props() { | |
return Props.create(DeviceManager.class); | |
} | |
public static final class RequestTrackDevice { | |
public final String groupId; | |
public final String deviceId; | |
public RequestTrackDevice(String groupId, String deviceId) { | |
this.groupId = groupId; | |
this.deviceId = deviceId; | |
} | |
} | |
public static final class DeviceRegistered { | |
} | |
final Map<String, ActorRef> groupIdToActor = new HashMap<>(); | |
final Map<ActorRef, String> actorToGroupId = new HashMap<>(); | |
@Override | |
public void preStart() { | |
log.info("DeviceManager started"); | |
} | |
@Override | |
public void postStop() { | |
log.info("DeviceManager stopped"); | |
} | |
private void onTrackDevice(RequestTrackDevice trackMsg) { | |
String groupId = trackMsg.groupId; | |
ActorRef ref = groupIdToActor.get(groupId); | |
if (ref != null) { | |
ref.forward(trackMsg, getContext()); | |
} else { | |
log.info("Creating device group actor for {}", groupId); | |
ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId); | |
getContext().watch(groupActor); | |
groupActor.forward(trackMsg, getContext()); | |
groupIdToActor.put(groupId, groupActor); | |
actorToGroupId.put(groupActor, groupId); | |
} | |
} | |
private void onTerminated(Terminated t) { | |
ActorRef groupActor = t.getActor(); | |
String groupId = actorToGroupId.get(groupActor); | |
log.info("Device group actor for {} has been terminated", groupId); | |
actorToGroupId.remove(groupActor); | |
groupIdToActor.remove(groupId); | |
} | |
public Receive createReceive() { | |
return receiveBuilder() | |
.match(RequestTrackDevice.class, this::onTrackDevice) | |
.match(Terminated.class, this::onTerminated) | |
.build(); | |
} | |
} | |
We leave tests of the device manager as an exercise for you since it is | |
very similar to the tests we have already written for the group actor. | |
What’s next? | |
We have now a hierarchical component for registering and tracking | |
devices and recording measurements. We have seen how to implement | |
different types of conversation patterns, such as: | |
- Request-respond (for temperature recordings) | |
- Delegate-respond (for registration of devices) | |
- Create-watch-terminate (for creating the group and device actor as | |
children) | |
In the next chapter, we will introduce group query capabilities, which | |
will establish a new conversation pattern of scatter-gather. In | |
particular, we will implement the functionality that allows users to | |
query the status of all the devices belonging to a group. | |
PART 5: QUERYING DEVICE GROUPS | |
The conversational patterns that we have seen so far are simple in the | |
sense that they require the actor to keep little or no state. | |
Specifically: | |
- Device actors return a reading, which requires no state change | |
- Record a temperature, which updates a single field | |
- Device Group actors maintain group membership by simply adding or | |
removing entries from a map | |
In this part, we will use a more complex example. Since homeowners will | |
be interested in the temperatures throughout their home, our goal is to | |
be able to query all of the device actors in a group. Let us start by | |
investigating how such a query API should behave. | |
Dealing with possible scenarios | |
The very first issue we face is that the membership of a group is | |
dynamic. Each sensor device is represented by an actor that can stop at | |
any time. At the beginning of the query, we can ask all of the existing | |
device actors for the current temperature. However, during the lifecycle | |
of the query: | |
- A device actor might stop and not be able to respond back with a | |
temperature reading. | |
- A new device actor might start up and not be included in the query | |
because we weren’t aware of it. | |
These issues can be addressed in many different ways, but the important | |
point is to settle on the desired behavior. The following works well for | |
our use case: | |
- When a query arrives, the group actor takes a _snapshot_ of the | |
existing device actors and will only ask those actors for the | |
temperature. | |
- Actors that start up _after_ the query arrives are simply ignored. | |
- If an actor in the snapshot stops during the query without | |
answering, we will simply report the fact that it stopped to the | |
sender of the query message. | |
Apart from device actors coming and going dynamically, some actors might | |
take a long time to answer. For example, they could be stuck in an | |
accidental infinite loop, or fail due to a bug and drop our request. We | |
don’t want the query to continue indefinitely, so we will consider it | |
complete in either of the following cases: | |
- All actors in the snapshot have either responded or have confirmed | |
being stopped. | |
- We reach a pre-defined deadline. | |
Given these decisions, along with the fact that a device in the snapshot | |
might have just started and not yet received a temperature to record, we | |
can define four states for each device actor, with respect to a | |
temperature query: | |
- It has a temperature available: Temperature(value) Temperature. | |
- It has responded, but has no temperature available yet: | |
TemperatureNotAvailable. | |
- It has stopped before answering: DeviceNotAvailable. | |
- It did not respond before the deadline: DeviceTimedOut. | |
Summarizing these in message types we can add the following to | |
DeviceGroup: | |
Scala | |
final case class RequestAllTemperatures(requestId: Long) | |
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) | |
sealed trait TemperatureReading | |
final case class Temperature(value: Double) extends TemperatureReading | |
case object TemperatureNotAvailable extends TemperatureReading | |
case object DeviceNotAvailable extends TemperatureReading | |
case object DeviceTimedOut extends TemperatureReading | |
Java | |
public static final class RequestAllTemperatures { | |
final long requestId; | |
public RequestAllTemperatures(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class RespondAllTemperatures { | |
final long requestId; | |
final Map<String, TemperatureReading> temperatures; | |
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) { | |
this.requestId = requestId; | |
this.temperatures = temperatures; | |
} | |
} | |
public static interface TemperatureReading { | |
} | |
public static final class Temperature implements TemperatureReading { | |
public final double value; | |
public Temperature(double value) { | |
this.value = value; | |
} | |
} | |
public static final class TemperatureNotAvailable implements TemperatureReading { | |
} | |
public static final class DeviceNotAvailable implements TemperatureReading { | |
} | |
public static final class DeviceTimedOut implements TemperatureReading { | |
} | |
Implementing the query | |
One approach for implementing the query involves adding code to the | |
group device actor. However, in practice this can be very cumbersome and | |
error prone. Remember that when we start a query, we need to take a | |
snapshot of the devices present and start a timer so that we can enforce | |
the deadline. In the meantime, _another query_ can arrive. For the | |
second query, of course, we need to keep track of the exact same | |
information but in isolation from the previous query. This would require | |
us to maintain separate mappings between queries and device actors. | |
Instead, we will implement a simpler, and superior approach. We will | |
create an actor that represents a _single query_ and that performs the | |
tasks needed to complete the query on behalf of the group actor. So far | |
we have created actors that belonged to classical domain objects, but | |
now, we will create an actor that represents a process or a task rather | |
than an entity. We benefit by keeping our group device actor simple and | |
being able to better test query capability in isolation. | |
Defining the query actor | |
First, we need to design the lifecycle of our query actor. This consists | |
of identifying its initial state, the first action it will take, and the | |
cleanup — if necessary. The query actor will need the following | |
information: | |
- The snapshot and IDs of active device actors to query. | |
- The ID of the request that started the query (so that we can include | |
it in the reply). | |
- The reference of the actor who sent the query. We will send the | |
reply to this actor directly. | |
- A deadline that indicates how long the query should wait for | |
replies. Making this a parameter will simplify testing. | |
Scheduling the query timeout | |
Since we need a way to indicate how long we are willing to wait for | |
responses, it is time to introduce a new Akka feature that we have not | |
used yet, the built-in scheduler facility. Using the scheduler is | |
simple: | |
- We get the scheduler from the ActorSystem, which, in turn, is | |
accessible from the actor’s context: | |
context.system.schedulergetContext().getSystem().scheduler(). This | |
needs an implicit ExecutionContext which is basically the | |
thread-pool that will execute the timer task itself. In our case, we | |
use the same dispatcher as the actor by importing | |
import context.dispatcher passing in getContext().dispatcher(). | |
- The scheduler.scheduleOnce(time, actorRef, message) | |
scheduler.scheduleOnce(time, actorRef, message, executor, sender) | |
method will schedule the message message into the future by the | |
specified time and send it to the actor actorRef. | |
We need to create a message that represents the query timeout. We create | |
a simple message CollectionTimeout without any parameters for this | |
purpose. The return value from scheduleOnce is a Cancellable which can | |
be used to cancel the timer if the query finishes successfully in time. | |
At the start of the query, we need to ask each of the device actors for | |
the current temperature. To be able to quickly detect devices that | |
stopped before they got the ReadTemperature message we will also watch | |
each of the actors. This way, we get Terminated messages for those that | |
stop during the lifetime of the query, so we don’t need to wait until | |
the timeout to mark these as not available. | |
Putting this together, the outline of our DeviceGroupQuery actor looks | |
like this: | |
Scala | |
object DeviceGroupQuery { | |
case object CollectionTimeout | |
def props( | |
actorToDeviceId: Map[ActorRef, String], | |
requestId: Long, | |
requester: ActorRef, | |
timeout: FiniteDuration | |
): Props = { | |
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) | |
} | |
} | |
class DeviceGroupQuery( | |
actorToDeviceId: Map[ActorRef, String], | |
requestId: Long, | |
requester: ActorRef, | |
timeout: FiniteDuration | |
) extends Actor with ActorLogging { | |
import DeviceGroupQuery._ | |
import context.dispatcher | |
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) | |
override def preStart(): Unit = { | |
actorToDeviceId.keysIterator.foreach { deviceActor => | |
context.watch(deviceActor) | |
deviceActor ! Device.ReadTemperature(0) | |
} | |
} | |
override def postStop(): Unit = { | |
queryTimeoutTimer.cancel() | |
} | |
} | |
Java | |
public class DeviceGroupQuery extends AbstractActor { | |
public static final class CollectionTimeout { | |
} | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final Map<ActorRef, String> actorToDeviceId; | |
final long requestId; | |
final ActorRef requester; | |
Cancellable queryTimeoutTimer; | |
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { | |
this.actorToDeviceId = actorToDeviceId; | |
this.requestId = requestId; | |
this.requester = requester; | |
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( | |
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() | |
); | |
} | |
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { | |
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout); | |
} | |
@Override | |
public void preStart() { | |
for (ActorRef deviceActor : actorToDeviceId.keySet()) { | |
getContext().watch(deviceActor); | |
deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); | |
} | |
} | |
@Override | |
public void postStop() { | |
queryTimeoutTimer.cancel(); | |
} | |
} | |
Tracking actor state | |
The query actor, apart from the pending timer, has one stateful aspect, | |
tracking the set of actors that: have replied, have stopped, or have not | |
replied. One way to track this state is to create a mutable field in the | |
actor (a var). A different approach takes advantage of the ability to | |
change how an actor responds to messages. A Receive is just a function | |
(or an object, if you like) that can be returned from another function. | |
By default, the receive block defines the behavior of the actor, but it | |
is possible to change it multiple times during the life of the actor. We | |
simply call context.become(newBehavior) where newBehavior is anything | |
with type Receive (which is just a shorthand for | |
PartialFunction[Any, Unit]). We will leverage this feature to track the | |
state of our actor. | |
For our use case: | |
1. Instead of defining receive directly, we delegate to a | |
waitingForReplies function to create the Receive. | |
2. The waitingForReplies function will keep track of two changing | |
values: | |
- a Map of already received replies | |
- a Set of actors that we still wait on | |
1. We have three events to act on: | |
- We can receive a RespondTemperature message from one of the devices. | |
- We can receive a Terminated message for a device actor that has been | |
stopped in the meantime. | |
- We can reach the deadline and receive a CollectionTimeout. | |
In the first two cases, we need to keep track of the replies, which we | |
now simply delegate to a method receivedResponse, which we will discuss | |
later. In the case of timeout, we need to simply take all the actors | |
that have not yet replied yet (the members of the set stillWaiting) and | |
put a DeviceTimedOut as the status in the final reply. Then we reply to | |
the submitter of the query with the collected results and stop the query | |
actor. | |
To accomplish this, add the following to your DeviceGroupQuery source | |
file: | |
Scala | |
override def receive: Receive = | |
waitingForReplies( | |
Map.empty, | |
actorToDeviceId.keySet | |
) | |
def waitingForReplies( | |
repliesSoFar: Map[String, DeviceGroup.TemperatureReading], | |
stillWaiting: Set[ActorRef] | |
): Receive = { | |
case Device.RespondTemperature(0, valueOption) => | |
val deviceActor = sender() | |
val reading = valueOption match { | |
case Some(value) => DeviceGroup.Temperature(value) | |
case None => DeviceGroup.TemperatureNotAvailable | |
} | |
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) | |
case Terminated(deviceActor) => | |
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) | |
case CollectionTimeout => | |
val timedOutReplies = | |
stillWaiting.map { deviceActor => | |
val deviceId = actorToDeviceId(deviceActor) | |
deviceId -> DeviceGroup.DeviceTimedOut | |
} | |
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) | |
context.stop(self) | |
} | |
Java | |
@Override | |
public Receive createReceive() { | |
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); | |
} | |
public Receive waitingForReplies( | |
Map<String, DeviceGroup.TemperatureReading> repliesSoFar, | |
Set<ActorRef> stillWaiting) { | |
return receiveBuilder() | |
.match(Device.RespondTemperature.class, r -> { | |
ActorRef deviceActor = getSender(); | |
DeviceGroup.TemperatureReading reading = r.value | |
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) | |
.orElse(new DeviceGroup.TemperatureNotAvailable()); | |
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); | |
}) | |
.match(Terminated.class, t -> { | |
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar); | |
}) | |
.match(CollectionTimeout.class, t -> { | |
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar); | |
for (ActorRef deviceActor : stillWaiting) { | |
String deviceId = actorToDeviceId.get(deviceActor); | |
replies.put(deviceId, new DeviceGroup.DeviceTimedOut()); | |
} | |
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); | |
getContext().stop(getSelf()); | |
}) | |
.build(); | |
} | |
It is not yet clear how we will “mutate” the answersSoFar and | |
stillWaiting data structures. One important thing to note is that the | |
function waitingForReplies DOES NOT HANDLE THE MESSAGES DIRECTLY. IT | |
RETURNS A Receive FUNCTION THAT WILL HANDLE THE MESSAGES. This means | |
that if we call waitingForReplies again, with different parameters, then | |
it returns a brand new Receive that will use those new parameters. | |
We have seen how we can install the initial Receive by simply returning | |
it from receive. In order to install a new one, to record a new reply, | |
for example, we need some mechanism. This mechanism is the method | |
context.become(newReceive) which will _change_ the actor’s message | |
handling function to the provided newReceive function. You can imagine | |
that before starting, your actor automatically calls | |
context.become(receive), i.e. installing the Receive function that is | |
returned from receive. This is another important observation: IT IS NOT | |
receive THAT HANDLES THE MESSAGES, IT JUST RETURNS A Receive FUNCTION | |
THAT WILL ACTUALLY HANDLE THE MESSAGES. | |
We now have to figure out what to do in receivedResponse. First, we need | |
to record the new result in the map repliesSoFar and remove the actor | |
from stillWaiting. The next step is to check if there are any remaining | |
actors we are waiting for. If there is none, we send the result of the | |
query to the original requester and stop the query actor. Otherwise, we | |
need to update the repliesSoFar and stillWaiting structures and wait for | |
more messages. | |
In the code before, we treated Terminated as the implicit response | |
DeviceNotAvailable, so receivedResponse does not need to do anything | |
special. However, there is one small task we still need to do. It is | |
possible that we receive a proper response from a device actor, but then | |
it stops during the lifetime of the query. We don’t want this second | |
event to overwrite the already received reply. In other words, we don’t | |
want to receive Terminated after we recorded the response. This is | |
simple to achieve by calling context.unwatch(ref). This method also | |
ensures that we don’t receive Terminated events that are already in the | |
mailbox of the actor. It is also safe to call this multiple times, only | |
the first call will have any effect, the rest is simply ignored. | |
With all this knowledge, we can create the receivedResponse method: | |
Scala | |
def receivedResponse( | |
deviceActor: ActorRef, | |
reading: DeviceGroup.TemperatureReading, | |
stillWaiting: Set[ActorRef], | |
repliesSoFar: Map[String, DeviceGroup.TemperatureReading] | |
): Unit = { | |
context.unwatch(deviceActor) | |
val deviceId = actorToDeviceId(deviceActor) | |
val newStillWaiting = stillWaiting - deviceActor | |
val newRepliesSoFar = repliesSoFar + (deviceId -> reading) | |
if (newStillWaiting.isEmpty) { | |
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) | |
context.stop(self) | |
} else { | |
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) | |
} | |
} | |
Java | |
public void receivedResponse(ActorRef deviceActor, | |
DeviceGroup.TemperatureReading reading, | |
Set<ActorRef> stillWaiting, | |
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) { | |
getContext().unwatch(deviceActor); | |
String deviceId = actorToDeviceId.get(deviceActor); | |
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting); | |
newStillWaiting.remove(deviceActor); | |
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar); | |
newRepliesSoFar.put(deviceId, reading); | |
if (newStillWaiting.isEmpty()) { | |
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); | |
getContext().stop(getSelf()); | |
} else { | |
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); | |
} | |
} | |
It is quite natural to ask at this point, what have we gained by using | |
the context.become() trick instead of just making the repliesSoFar and | |
stillWaiting structures mutable fields of the actor (i.e. vars)? In this | |
simple example, not that much. The value of this style of state keeping | |
becomes more evident when you suddenly have _more kinds_ of states. | |
Since each state might have temporary data that is relevant itself, | |
keeping these as fields would pollute the global state of the actor, | |
i.e. it is unclear what fields are used in what state. Using | |
parameterized Receive “factory” methods we can keep data private that is | |
only relevant to the state. It is still a good exercise to rewrite the | |
query using vars mutable fields instead of context.become(). However, it | |
is recommended to get comfortable with the solution we have used here as | |
it helps structuring more complex actor code in a cleaner and more | |
maintainable way. | |
Our query actor is now done: | |
Scala | |
object DeviceGroupQuery { | |
case object CollectionTimeout | |
def props( | |
actorToDeviceId: Map[ActorRef, String], | |
requestId: Long, | |
requester: ActorRef, | |
timeout: FiniteDuration | |
): Props = { | |
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) | |
} | |
} | |
class DeviceGroupQuery( | |
actorToDeviceId: Map[ActorRef, String], | |
requestId: Long, | |
requester: ActorRef, | |
timeout: FiniteDuration | |
) extends Actor with ActorLogging { | |
import DeviceGroupQuery._ | |
import context.dispatcher | |
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) | |
override def preStart(): Unit = { | |
actorToDeviceId.keysIterator.foreach { deviceActor => | |
context.watch(deviceActor) | |
deviceActor ! Device.ReadTemperature(0) | |
} | |
} | |
override def postStop(): Unit = { | |
queryTimeoutTimer.cancel() | |
} | |
override def receive: Receive = | |
waitingForReplies( | |
Map.empty, | |
actorToDeviceId.keySet | |
) | |
def waitingForReplies( | |
repliesSoFar: Map[String, DeviceGroup.TemperatureReading], | |
stillWaiting: Set[ActorRef] | |
): Receive = { | |
case Device.RespondTemperature(0, valueOption) => | |
val deviceActor = sender() | |
val reading = valueOption match { | |
case Some(value) => DeviceGroup.Temperature(value) | |
case None => DeviceGroup.TemperatureNotAvailable | |
} | |
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) | |
case Terminated(deviceActor) => | |
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) | |
case CollectionTimeout => | |
val timedOutReplies = | |
stillWaiting.map { deviceActor => | |
val deviceId = actorToDeviceId(deviceActor) | |
deviceId -> DeviceGroup.DeviceTimedOut | |
} | |
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) | |
context.stop(self) | |
} | |
def receivedResponse( | |
deviceActor: ActorRef, | |
reading: DeviceGroup.TemperatureReading, | |
stillWaiting: Set[ActorRef], | |
repliesSoFar: Map[String, DeviceGroup.TemperatureReading] | |
): Unit = { | |
context.unwatch(deviceActor) | |
val deviceId = actorToDeviceId(deviceActor) | |
val newStillWaiting = stillWaiting - deviceActor | |
val newRepliesSoFar = repliesSoFar + (deviceId -> reading) | |
if (newStillWaiting.isEmpty) { | |
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) | |
context.stop(self) | |
} else { | |
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) | |
} | |
} | |
} | |
Java | |
public class DeviceGroupQuery extends AbstractActor { | |
public static final class CollectionTimeout { | |
} | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final Map<ActorRef, String> actorToDeviceId; | |
final long requestId; | |
final ActorRef requester; | |
Cancellable queryTimeoutTimer; | |
public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { | |
this.actorToDeviceId = actorToDeviceId; | |
this.requestId = requestId; | |
this.requester = requester; | |
queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce( | |
timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf() | |
); | |
} | |
public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { | |
return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout); | |
} | |
@Override | |
public void preStart() { | |
for (ActorRef deviceActor : actorToDeviceId.keySet()) { | |
getContext().watch(deviceActor); | |
deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); | |
} | |
} | |
@Override | |
public void postStop() { | |
queryTimeoutTimer.cancel(); | |
} | |
@Override | |
public Receive createReceive() { | |
return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); | |
} | |
public Receive waitingForReplies( | |
Map<String, DeviceGroup.TemperatureReading> repliesSoFar, | |
Set<ActorRef> stillWaiting) { | |
return receiveBuilder() | |
.match(Device.RespondTemperature.class, r -> { | |
ActorRef deviceActor = getSender(); | |
DeviceGroup.TemperatureReading reading = r.value | |
.map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) | |
.orElse(new DeviceGroup.TemperatureNotAvailable()); | |
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); | |
}) | |
.match(Terminated.class, t -> { | |
receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar); | |
}) | |
.match(CollectionTimeout.class, t -> { | |
Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar); | |
for (ActorRef deviceActor : stillWaiting) { | |
String deviceId = actorToDeviceId.get(deviceActor); | |
replies.put(deviceId, new DeviceGroup.DeviceTimedOut()); | |
} | |
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); | |
getContext().stop(getSelf()); | |
}) | |
.build(); | |
} | |
public void receivedResponse(ActorRef deviceActor, | |
DeviceGroup.TemperatureReading reading, | |
Set<ActorRef> stillWaiting, | |
Map<String, DeviceGroup.TemperatureReading> repliesSoFar) { | |
getContext().unwatch(deviceActor); | |
String deviceId = actorToDeviceId.get(deviceActor); | |
Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting); | |
newStillWaiting.remove(deviceActor); | |
Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar); | |
newRepliesSoFar.put(deviceId, reading); | |
if (newStillWaiting.isEmpty()) { | |
requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); | |
getContext().stop(getSelf()); | |
} else { | |
getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); | |
} | |
} | |
} | |
Testing the query actor | |
Now let’s verify the correctness of the query actor implementation. | |
There are various scenarios we need to test individually to make sure | |
everything works as expected. To be able to do this, we need to simulate | |
the device actors somehow to exercise various normal or failure | |
scenarios. Thankfully we took the list of collaborators (actually a Map) | |
as a parameter to the query actor, so we can easily pass in TestProbe | |
TestKit references. In our first test, we try out the case when there | |
are two devices and both report a temperature: | |
Scala | |
"return temperature value for working devices" in { | |
val requester = TestProbe() | |
val device1 = TestProbe() | |
val device2 = TestProbe() | |
val queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), | |
requestId = 1, | |
requester = requester.ref, | |
timeout = 3.seconds | |
)) | |
device1.expectMsg(Device.ReadTemperature(requestId = 0)) | |
device2.expectMsg(Device.ReadTemperature(requestId = 0)) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) | |
requester.expectMsg(DeviceGroup.RespondAllTemperatures( | |
requestId = 1, | |
temperatures = Map( | |
"device1" -> DeviceGroup.Temperature(1.0), | |
"device2" -> DeviceGroup.Temperature(2.0) | |
) | |
)) | |
} | |
Java | |
@Test | |
public void testReturnTemperatureValueForWorkingDevices() { | |
TestKit requester = new TestKit(system); | |
TestKit device1 = new TestKit(system); | |
TestKit device2 = new TestKit(system); | |
Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
actorToDeviceId.put(device1.getRef(), "device1"); | |
actorToDeviceId.put(device2.getRef(), "device2"); | |
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId, | |
1L, | |
requester.getRef(), | |
new FiniteDuration(3, TimeUnit.SECONDS))); | |
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); | |
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); | |
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); | |
assertEquals(1L, response.requestId); | |
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); | |
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); | |
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); | |
assertEqualTemperatures(expectedTemperatures, response.temperatures); | |
} | |
That was the happy case, but we know that sometimes devices cannot | |
provide a temperature measurement. This scenario is just slightly | |
different from the previous: | |
Scala | |
"return TemperatureNotAvailable for devices with no readings" in { | |
val requester = TestProbe() | |
val device1 = TestProbe() | |
val device2 = TestProbe() | |
val queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), | |
requestId = 1, | |
requester = requester.ref, | |
timeout = 3.seconds | |
)) | |
device1.expectMsg(Device.ReadTemperature(requestId = 0)) | |
device2.expectMsg(Device.ReadTemperature(requestId = 0)) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) | |
requester.expectMsg(DeviceGroup.RespondAllTemperatures( | |
requestId = 1, | |
temperatures = Map( | |
"device1" -> DeviceGroup.TemperatureNotAvailable, | |
"device2" -> DeviceGroup.Temperature(2.0) | |
) | |
)) | |
} | |
Java | |
@Test | |
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { | |
TestKit requester = new TestKit(system); | |
TestKit device1 = new TestKit(system); | |
TestKit device2 = new TestKit(system); | |
Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
actorToDeviceId.put(device1.getRef(), "device1"); | |
actorToDeviceId.put(device2.getRef(), "device2"); | |
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId, | |
1L, | |
requester.getRef(), | |
new FiniteDuration(3, TimeUnit.SECONDS))); | |
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); | |
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef()); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); | |
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); | |
assertEquals(1L, response.requestId); | |
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); | |
expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable()); | |
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); | |
assertEqualTemperatures(expectedTemperatures, response.temperatures); | |
} | |
We also know, that sometimes device actors stop before answering: | |
Scala | |
"return DeviceNotAvailable if device stops before answering" in { | |
val requester = TestProbe() | |
val device1 = TestProbe() | |
val device2 = TestProbe() | |
val queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), | |
requestId = 1, | |
requester = requester.ref, | |
timeout = 3.seconds | |
)) | |
device1.expectMsg(Device.ReadTemperature(requestId = 0)) | |
device2.expectMsg(Device.ReadTemperature(requestId = 0)) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) | |
device2.ref ! PoisonPill | |
requester.expectMsg(DeviceGroup.RespondAllTemperatures( | |
requestId = 1, | |
temperatures = Map( | |
"device1" -> DeviceGroup.Temperature(1.0), | |
"device2" -> DeviceGroup.DeviceNotAvailable | |
) | |
)) | |
} | |
Java | |
@Test | |
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { | |
TestKit requester = new TestKit(system); | |
TestKit device1 = new TestKit(system); | |
TestKit device2 = new TestKit(system); | |
Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
actorToDeviceId.put(device1.getRef(), "device1"); | |
actorToDeviceId.put(device2.getRef(), "device2"); | |
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId, | |
1L, | |
requester.getRef(), | |
new FiniteDuration(3, TimeUnit.SECONDS))); | |
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); | |
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); | |
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); | |
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); | |
assertEquals(1L, response.requestId); | |
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); | |
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); | |
expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable()); | |
assertEqualTemperatures(expectedTemperatures, response.temperatures); | |
} | |
If you remember, there is another case related to device actors | |
stopping. It is possible that we get a normal reply from a device actor, | |
but then receive a Terminated for the same actor later. In this case, we | |
would like to keep the first reply and not mark the device as | |
DeviceNotAvailable. We should test this, too: | |
Scala | |
"return temperature reading even if device stops after answering" in { | |
val requester = TestProbe() | |
val device1 = TestProbe() | |
val device2 = TestProbe() | |
val queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), | |
requestId = 1, | |
requester = requester.ref, | |
timeout = 3.seconds | |
)) | |
device1.expectMsg(Device.ReadTemperature(requestId = 0)) | |
device2.expectMsg(Device.ReadTemperature(requestId = 0)) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) | |
device2.ref ! PoisonPill | |
requester.expectMsg(DeviceGroup.RespondAllTemperatures( | |
requestId = 1, | |
temperatures = Map( | |
"device1" -> DeviceGroup.Temperature(1.0), | |
"device2" -> DeviceGroup.Temperature(2.0) | |
) | |
)) | |
} | |
Java | |
@Test | |
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { | |
TestKit requester = new TestKit(system); | |
TestKit device1 = new TestKit(system); | |
TestKit device2 = new TestKit(system); | |
Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
actorToDeviceId.put(device1.getRef(), "device1"); | |
actorToDeviceId.put(device2.getRef(), "device2"); | |
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId, | |
1L, | |
requester.getRef(), | |
new FiniteDuration(3, TimeUnit.SECONDS))); | |
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); | |
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); | |
device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); | |
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); | |
assertEquals(1L, response.requestId); | |
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); | |
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); | |
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); | |
assertEqualTemperatures(expectedTemperatures, response.temperatures); | |
} | |
The final case is when not all devices respond in time. To keep our test | |
relatively fast, we will construct the DeviceGroupQuery actor with a | |
smaller timeout: | |
Scala | |
"return DeviceTimedOut if device does not answer in time" in { | |
val requester = TestProbe() | |
val device1 = TestProbe() | |
val device2 = TestProbe() | |
val queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), | |
requestId = 1, | |
requester = requester.ref, | |
timeout = 1.second | |
)) | |
device1.expectMsg(Device.ReadTemperature(requestId = 0)) | |
device2.expectMsg(Device.ReadTemperature(requestId = 0)) | |
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) | |
requester.expectMsg(DeviceGroup.RespondAllTemperatures( | |
requestId = 1, | |
temperatures = Map( | |
"device1" -> DeviceGroup.Temperature(1.0), | |
"device2" -> DeviceGroup.DeviceTimedOut | |
) | |
)) | |
} | |
Java | |
@Test | |
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { | |
TestKit requester = new TestKit(system); | |
TestKit device1 = new TestKit(system); | |
TestKit device2 = new TestKit(system); | |
Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
actorToDeviceId.put(device1.getRef(), "device1"); | |
actorToDeviceId.put(device2.getRef(), "device2"); | |
ActorRef queryActor = system.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId, | |
1L, | |
requester.getRef(), | |
new FiniteDuration(3, TimeUnit.SECONDS))); | |
assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); | |
assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); | |
queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); | |
DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass( | |
FiniteDuration.create(5, TimeUnit.SECONDS), | |
DeviceGroup.RespondAllTemperatures.class); | |
assertEquals(1L, response.requestId); | |
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); | |
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); | |
expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut()); | |
assertEqualTemperatures(expectedTemperatures, response.temperatures); | |
} | |
Our query works as expected now, it is time to include this new | |
functionality in the DeviceGroup actor now. | |
Adding query capability to the group | |
Including the query feature in the group actor is fairly simple now. We | |
did all the heavy lifting in the query actor itself, the group actor | |
only needs to create it with the right initial parameters and nothing | |
else. | |
Scala | |
class DeviceGroup(groupId: String) extends Actor with ActorLogging { | |
var deviceIdToActor = Map.empty[String, ActorRef] | |
var actorToDeviceId = Map.empty[ActorRef, String] | |
var nextCollectionId = 0L | |
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) | |
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) | |
override def receive: Receive = { | |
// ... other cases omitted | |
case RequestAllTemperatures(requestId) => | |
context.actorOf(DeviceGroupQuery.props( | |
actorToDeviceId = actorToDeviceId, | |
requestId = requestId, | |
requester = sender(), | |
3.seconds | |
)) | |
} | |
} | |
Java | |
public class DeviceGroup extends AbstractActor { | |
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); | |
final String groupId; | |
public DeviceGroup(String groupId) { | |
this.groupId = groupId; | |
} | |
public static Props props(String groupId) { | |
return Props.create(DeviceGroup.class, groupId); | |
} | |
public static final class RequestDeviceList { | |
final long requestId; | |
public RequestDeviceList(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class ReplyDeviceList { | |
final long requestId; | |
final Set<String> ids; | |
public ReplyDeviceList(long requestId, Set<String> ids) { | |
this.requestId = requestId; | |
this.ids = ids; | |
} | |
} | |
public static final class RequestAllTemperatures { | |
final long requestId; | |
public RequestAllTemperatures(long requestId) { | |
this.requestId = requestId; | |
} | |
} | |
public static final class RespondAllTemperatures { | |
final long requestId; | |
final Map<String, TemperatureReading> temperatures; | |
public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) { | |
this.requestId = requestId; | |
this.temperatures = temperatures; | |
} | |
} | |
public static interface TemperatureReading { | |
} | |
public static final class Temperature implements TemperatureReading { | |
public final double value; | |
public Temperature(double value) { | |
this.value = value; | |
} | |
} | |
public static final class TemperatureNotAvailable implements TemperatureReading { | |
} | |
public static final class DeviceNotAvailable implements TemperatureReading { | |
} | |
public static final class DeviceTimedOut implements TemperatureReading { | |
} | |
final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); | |
final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); | |
final long nextCollectionId = 0L; | |
@Override | |
public void preStart() { | |
log.info("DeviceGroup {} started", groupId); | |
} | |
@Override | |
public void postStop() { | |
log.info("DeviceGroup {} stopped", groupId); | |
} | |
private void onAllTemperatures(RequestAllTemperatures r) { | |
getContext().actorOf(DeviceGroupQuery.props( | |
actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS))); | |
} | |
@Override | |
public Receive createReceive() { | |
// ... other cases omitted | |
.match(RequestAllTemperatures.class, this::onAllTemperatures) | |
.build(); | |
} | |
} | |
It is probably worth restating what we said at the beginning of the | |
chapter. By keeping the temporary state that is only relevant to the | |
query itself in a separate actor we keep the group actor implementation | |
very simple. It delegates everything to child actors and therefore does | |
not have to keep state that is not relevant to its core business. Also, | |
multiple queries can now run parallel to each other, in fact, as many as | |
needed. In our case querying an individual device actor is a fast | |
operation, but if this were not the case, for example, because the | |
remote sensors need to be contacted over the network, this design would | |
significantly improve throughput. | |
We close this chapter by testing that everything works together. This | |
test is just a variant of the previous ones, now exercising the group | |
query feature: | |
Scala | |
"be able to collect temperatures from all active devices" in { | |
val probe = TestProbe() | |
val groupActor = system.actorOf(DeviceGroup.props("group")) | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor1 = probe.lastSender | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor2 = probe.lastSender | |
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) | |
probe.expectMsg(DeviceManager.DeviceRegistered) | |
val deviceActor3 = probe.lastSender | |
// Check that the device actors are working | |
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) | |
probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) | |
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) | |
probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) | |
// No temperature for device3 | |
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) | |
probe.expectMsg( | |
DeviceGroup.RespondAllTemperatures( | |
requestId = 0, | |
temperatures = Map( | |
"device1" -> DeviceGroup.Temperature(1.0), | |
"device2" -> DeviceGroup.Temperature(2.0), | |
"device3" -> DeviceGroup.TemperatureNotAvailable))) | |
} | |
Java | |
@Test | |
public void testCollectTemperaturesFromAllActiveDevices() { | |
TestKit probe = new TestKit(system); | |
ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor1 = probe.getLastSender(); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor2 = probe.getLastSender(); | |
groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef()); | |
probe.expectMsgClass(DeviceManager.DeviceRegistered.class); | |
ActorRef deviceActor3 = probe.getLastSender(); | |
// Check that the device actors are working | |
deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); | |
assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); | |
deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); | |
assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); | |
// No temperature for device 3 | |
groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef()); | |
DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); | |
assertEquals(0L, response.requestId); | |
Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); | |
expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); | |
expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); | |
expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable()); | |
assertEqualTemperatures(expectedTemperatures, response.temperatures); | |
} | |
Summary | |
In the context of the IoT system, this guide introduced the following | |
concepts, among others. You can follow the links to review them if | |
necessary: | |
- The hierarchy of actors and their lifecycle | |
- The importance of designing messages for flexibility | |
- How to watch and stop actors, if necessary | |
What’s Next? | |
To continue your journey with Akka, we recommend: | |
- Start building your own applications with Akka, make sure you get | |
involved in our amazing community for help if you get stuck. | |
- If you’d like some additional background, read the rest of the | |
reference documentation and check out some of the books and video’s | |
on Akka. | |
LINKS TO MORE AKKA DOCUMENTATION | |
* Akka Documentation: http://akka.io/docs/ | |
* Try Akka: http://akka.io/try-akka/ | |
* Akka Commercial Addons: http://developer.lightbend.com/docs/akka-commercial-addons/current/ | |
* Alpakka - Akka Streams Connectors: http://developer.lightbend.com/docs/alpakka/current/ | |
* Akka Streams connector for Apache Kafka: http://doc.akka.io/docs/akka-stream-kafka/current/home.html | |
* Akka (Cluster) Management: http://developer.lightbend.com/docs/akka-management/current/ | |
* Cassandra Plugins for Akka Persistence: https://github.com/akka/akka-persistence-cassandra | |
* DynamoDBJournal for Akka Persistence: https://github.com/akka/akka-persistence-dynamodb | |
* Samples and guides: http://developer.lightbend.com/start/?group=akka |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment