Skip to content

Instantly share code, notes, and snippets.

View ktoso's full-sized avatar
🗻
Life is Study!

Konrad `ktoso` Malawski ktoso

🗻
Life is Study!
View GitHub Profile

Made for a purpose - API design in Akka over the last 9 years

(new talk, I think could be very exciting actually, we've had numerous tradeoffs made, some bad which we backtracked on, some good which formed the core of Akka until this day... It's mostly about design tradeoffs, but on specific cases that could be interesting to debate with others I think afterwards :-))

In this talk we'll discuss several APIs, and their evolution. From successful design decisions, to dead-ends which had to be removed. Over it's lifetime Akka has seen multiple core APIs that have led vastly different lifes, such as the schedulers, core Actor APIs for creation as well as of course message sending APIs which saw the most evolution but also the more time went by, the more minimal and precise they got. And of course the evolution of the Akka Streams and Reactive Streams API/SPIs, that have seen numerous iterations before seeing a stable API that we were confident would

Akka Artery Remoting internals: ActorRef & Manifest Compression

In this blog post we'll dive into one of the many optimisations that the new Akka Remoting (codenamed Artery, see docs here) brings to the table.

This specific optimisation is targeted at improving a specific communication pattern, which is:

  • chatty Actors - which send a lot of messages
  • which also happen to be rather small

With small messages, but deep hierarchies the

[info] Result "scala.future.TransformationBenchmark.transformation_8192":
[info] 0.039 ±(99.9%) 0.003 us/op [Average]
[info] (min, avg, max) = (0.036, 0.039, 0.044), stdev = 0.003
[info] CI (99.9%): [0.036, 0.041] (assumes normal distribution)
[info]
[info]
[info] # Run complete. Total time: 01:16:32
[info]
[info] Benchmark (impl) (result) Mode Cnt Score Error Units
[info] CallbackBenchmark.onComplete_1 stdlib N/A thrpt 20 14.141 ± 0.253 ops/us

Macbook Pro (Retina, late 2013), 16G, Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz

Usual desktop applications running, browsers, music etc.

after clean, sbt test:compile

real	 8m33.910s
user	21m11.287s
sys 0m45.226s
v10.0.3 / 2017-01-26
====================
* Merge pull request #835 from jrudolph/jr/w/738-client-side-documentation
* =doc #738 revamp client-side host-level and connection-level examples + docs
* Merge pull request #822 from jrudolph/jr/w/better-seal-scaladoc
* =doc scalariform fix
* Merge remote-tracking branch 'origin/pull/803' (PR #803)
* Provide Rejection Documentation #593
public <A> Flow<A, A, NotUsed> debounceSelect(FiniteDuration interval, Function<List<A>, A> pick, int max) {
return Flow.<A>create().groupedWithin(max, interval).map(group -> pick.apply(group));
}
static class RefreshSignal { public RefreshSignal(Object wat) {} }
final SourceQueueWithComplete<RefreshSignal> queue =
Source.<RefreshSignal>queue(4, OverflowStrategy.dropHead()) // give it some buffer space, but we can drop signals anyway, since if we have at least one, it means we want to trigger one refresh
.via(debounceSelect(FiniteDuration.create(1, TimeUnit.SECONDS), it -> it.get(0), 100)) // picking any of the refresh signals
.to(Sink.actorRef(target, new RefreshSignal("done")))
/**
* Groups items within a given time interval (unless the max size is reached, then earlier),
* and picks the single element to signal downstream using the provided `pick` function.
*/
public <A> Flow<A, A, NotUsed> debounceSelect(FiniteDuration interval, Function<List<A>, A> pick, int max) {
return Flow.<A>create().groupedWithin(max, interval).map(group -> pick.apply(group));
}
class RefreshSignal {
public RefreshSignal(Object a) {}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.Materializer
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}
final case class ResponseTimeoutException(request: HttpRequest, message: String) extends RuntimeException(message)
127 rst/additional/books.rst
1162 rst/additional/faq.rst
12 rst/additional/index.rst
22 rst/additional/language-bindings.rst
1208 rst/additional/osgi.rst
974 rst/common/binary-compatibility-rules.rst
606 rst/common/circuitbreaker.rst
2530 rst/common/cluster.rst
283 rst/common/duration.rst
646 rst/dev/building-akka.rst
implicit class flowTweaks[M](val wsInput: Source[Message, M]) {
def forceTextStrict: Source[Strict, M] = wsInput
.collect {
case TextMessage.Strict(text) ⇒ Future.successful(text)
case TextMessage.Streamed(textStream) ⇒ textStream.runFold("")(_ + _)
case BinaryMessage.Strict(binary) ⇒ Skip
case BinaryMessage.Streamed(binaryStream) ⇒ binaryStream.runWith(Sink.ignore); Skip
}
.filterNot(_ == Skip)
.mapAsync(1)(ConstantFun.scalaIdentityFunction)