In this blog post we'll dive into one of the many optimisations that the new Akka Remoting (codenamed Artery, see docs here) brings to the table.
This specific optimisation is targeted at improving a specific communication pattern, which is:
- chatty Actors - which send a lot of messages
- which also happen to be rather small
With small messages, but deep hierarchies the
In Artery, we have re-designed the binary message envelope representation, which is now highly write/read-optimised and space efficient. The size overhead of the envelope has a significant impact on message throughput when small messages are being sent, since then the envelope may be a significant percentage of the entire payload that we need to write on to the wire.
Among other things, the envelope contains information about the recipient of the message, and which serializer should be used to deserialise the message. Let's focus on the recipient path for now.
When you send a message to a remote Actor in Akka, the Remoting infrastructure attaches the address of who the message is intended for, who sent it (to provide the sender(): ActorRef
on the receiving side) and some additional information, and puts it onto the wire. So a simple call like:
val lightbulb: ActorRef = ???
lightBulb ! "Hello"
Eventually ends up being serialised using a custom optimised binary format as an Akka message envelope containing the additional information. The message itself is serialized using whichever serializer the user has configured (TODO link to docs here).
Let's say that the lightbulb actor lives on machine 10.0.0.1
, and its full address is: akka://[email protected]:25520/user/controls/lightbulbs/xs-1337
, which is 68 characters long, which seems to be a pretty common path-length, though we've certainly seen systems with paths reaching as much as thousands of characters, especially if the path included many identifiers of sub-systems that they are modelling for example. So generally speaking we're often looking at hundreds of bytes used in each message to carry around sender/receiver and manifest information. This information may, with small messages–and especially when efficient binary serializers (such as ProtoBuf, Kryo, SBE, or o others) are used–completely dominate the size of the message.
In our example we just send a simple String, which is encoded directly as byte array in the message (also to be able to easily see it in our hex dump). Generally though binary serializers should be used with Akka, and especially the high throughput Artery remoting implementation.
The message could be efficiently serialised (TODO) in as little as 1 byte without much hassle. The entire payload of the remote message will therefore add up to exactly (TODO) 162 bytes.
TODO image of the header format
The goal of compression is to get rid of carrying the Strings around in each message send and just advertise a single number for a given path, and use that number instead. For instance, the message we've seen in the previous section will be compressed to exactly 42 bytes, with no explicit string literals being encoded at all once the compression advertisement has run. For a side-by-side comparison, refer to the screenshot below showing the serialised forms with and without compression enabled:
So thanks to the ActorRef
compression we were able to reduce the message size by 74%
- from 162
to 42
bytes (sic!). The message itself was left untouched, and still is 5 bytes (42 65 6C 6C 6F
), so the overhead expressed as ratio of the envelope size to the message size, which initially was 97%
was improved to a much nicer value of 86%
, which sounds less impressive however one should note that the envelope size won't grow and stay at this size, and messages sent between those Actors may have different size.
Additional data such as String manifests for serialisers are also compressed to single numeric values instead of keeping around their string representation in each message sent.
In addition, special ActorRefs such as deadLetters
are always compressed, since they are rather common in fire-and-forget interactions.
One of the main design goals of compression, besides of decreasing message size is also to be absolutely safe in case of node restarts, split brain scenarios or other network problems.
To achieve this, the algorithm is "started" at the receiving end of a connection. It is the receiver that detects which values are seen most often and need to be compressed. It then advertises a compression table it has prepared locally to the remote node, which then begins to use the new compression identifiers instead of the full string representations of the values.
This approach allows us to trivially be resilient against the remote node crashing and restarting - thus "forgetting" the compression tables. A restarted sender node would simply resume operations without compression, which can always be understood by the receiving node. The receiving node would simply advertise the compression table again, and thus resume operations the optimised message sizes. So the algorithm is self-healing and does not need any special casing or handling to resume efficient operations.
In order to apply compression to the most significant (most "chatty") Actors (and manifests), we need to employ some form of heavy hitter detection. In order to do this we need to know how many times a given actor has sent a message. What complicates matters though is that a single node may easily host millions of Actors, thus we can not afford storing a counter for each of the Actors just to track if they're "heavy" or not.
Hence we turn to approximate a well-known approximate data structure called Count-Min-Sketch (TODO link, whitepaper). In short, a CMS allows us to store approximate counts for our actor "hits" in sub-linear space.
TODO calculate the specific data / size.
We assume the Cash Register model in which there are only additions, which simplifies HH detection significantly. In other words, the counter for messages sent to/from a given ActorRef can only ever grow, which allows us to simplify the heavy hitter detection. We only need to track the "lowest count of the heavy hitters", since operations can only:
- increment count of an entry outside of the heavy hitters limit and remain under the limit. In this situation we don't need to keep the complete value in memory, we just keep the approximate count in CMS.
- increment count of an entry outside of the heavy hitters limit and exceed the limit. This causes the entry to be stored in memory as we will want to include it in compression advertisements from here on.
- increment count of an existing hitter, which actually is a no-op unless the incremented value is the lowest heavy hitter, since for that one we need to find the new lowest heavy hitter, since it is it that defines the cut-off point at which time entries become heavy hitters.
FIXME, above needs re-reading.
TODO explain why we use the heap.
Additionally, we are able to dynamically sample more or less data to include in the heavy hitters algorithms depending on the rate of incoming messages.
In the incoming messages Akka Stream (which Artery is implemented in) one of the first stages processing the messages is the Decoder
stage. Since it trivially can know about the rate of incoming messages per second, we utilise this information to feed back into our adaptive sampling mechanisms which initially track every message sent, however it the rate exceeds a specific threshold (e.g. 1000 messages per second), the dynamic adaptive sampling mechanism kicks in and causes the HH counters only to be incremented for every n
-th message. The adaptive sampling adjusts the hitting rate accordingly to increasing incoming traffic from there on as well, so with 100.000 messages per second we would be recording every m
-th message, where m >> n
. Once the traffic slows down we increase the sampling rate again, so it self-regulates either way.
The here described optimisation is really just a tiny piece of the optimisations and tricks implemented in Artery to make Actor communication fast, (almost) allocation-free and still completely transparent to end users – it still is as simple as actor ! message
.
In the next posts in this series we'll dive into other optimisations, custom data structures and algorithms we implemented in Artery. We hope you'll enjoy this view into the internals of a modern ...
List other optimisations:
- TODO
- TODO
- TODO
- TODO
- TODO
Notes:
- dead letters is always compressed