Skip to content

Instantly share code, notes, and snippets.

@Roiocam
Created September 14, 2023 02:47
Show Gist options
  • Save Roiocam/aabbf404aa2e1a4e967967e450b0def0 to your computer and use it in GitHub Desktop.
Save Roiocam/aabbf404aa2e1a4e967967e450b0def0 to your computer and use it in GitHub Desktop.
import akka.NotUsed
import akka.actor._
import akka.stream.Materializer
import akka.stream.SystemMaterializer
import akka.stream.scaladsl.Source
import akka.testkit.TestProbe
import org.openjdk.jmh.annotations.Scope
import org.openjdk.jmh.annotations._
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
/*
# OS: macOS 13.3
# CPU: Apple M1 CPU @ 2.40GHz
# Date: Thu Sep 14 10:19:13 CST 2023
[info] # JMH version: 1.36
[info] # VM version: JDK 11.0.17, OpenJDK 64-Bit Server VM, 11.0.17+8-LTS
[info] # VM options: -XX:+UseG1GC -Xms3g -Xmx3g -Xss2m -XX:+PerfDisableSharedMem -XX:MaxGCPauseMillis=300 -XX:MaxDirectMemorySize=256m -Djava.security.egd=file:/dev/./urandom
[info] Benchmark Mode Cnt Score Error Units
[info] JdbcAsyncJournalBenchmark.originalAsync thrpt 3 590394.058 ± 38433.920 ops/s
[info] JdbcAsyncJournalBenchmark.streamWithCollect thrpt 3 594978.499 ± 2034.776 ops/s
[info] Benchmark Mode Cnt Score Error Units
[info] JdbcAsyncJournalBenchmark.originalAsync thrpt 3 584438.653 ± 35769.421 ops/s
[info] JdbcAsyncJournalBenchmark.streamWithCollect thrpt 3 587042.327 ± 15553.554 ops/s
*/
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class JdbcAsyncJournalBenchmark {
var system: ActorSystem = _
var probe: TestProbe = _
var mockReceiver: ActorRef = _
var messagesWithBatchMock: Source[Try[Long], NotUsed] = _
implicit var mat : Materializer = _
val MAX_ELEMENT: Long = 10_000L
@Setup
def setup(): Unit = {
system = ActorSystem("test")
probe = TestProbe()(system)
val props = Props(new MockReceiver())
mockReceiver = system.actorOf(props, "mockReceiver")
mat = SystemMaterializer(system).materializer
val initial: Long = 0L
val increment: Long => Option[(Long, Seq[Try[Long]])] = prev => {
Thread.sleep(5) // mock database response.
if (prev >= MAX_ELEMENT) None
else {
val next = prev + 1
val elements = Iterator.range(next, next + 400L).map(e => Try(e)).toSeq
Some((next + 400, elements))
}
}
messagesWithBatchMock = Source.unfold[Long, Seq[Try[Long]]](initial)(increment).mapConcat(identity)
}
@TearDown
def shutdown(): Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def originalAsync(): Unit = {
messagesWithBatchMock
.take(Long.MaxValue)
.mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr))
.runForeach(repr => mockReceiver ! repr)
.map(_ => ())
probe.expectMsg(FiniteDuration(10, TimeUnit.SECONDS), MAX_ELEMENT)
}
@Benchmark
@OperationsPerInvocation(100000)
def streamWithCollect(): Unit = {
messagesWithBatchMock
.take(Long.MaxValue)
.collect { case Success(reprAndOrdNr) => reprAndOrdNr }
.runForeach(repr => mockReceiver ! repr)
.map(_ => ())
probe.expectMsg(FiniteDuration(10, TimeUnit.SECONDS), MAX_ELEMENT)
}
class MockReceiver extends Actor {
def receive: Receive = {
case n: Long =>
if (n == MAX_ELEMENT) {
probe.ref ! n
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment