-
-
Save Roiocam/aabbf404aa2e1a4e967967e450b0def0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor._ | |
import akka.stream.Materializer | |
import akka.stream.SystemMaterializer | |
import akka.stream.scaladsl.Source | |
import akka.testkit.TestProbe | |
import org.openjdk.jmh.annotations.Scope | |
import org.openjdk.jmh.annotations._ | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.Await | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.Future | |
import scala.concurrent.duration._ | |
import scala.util.Success | |
import scala.util.Try | |
/* | |
# OS: macOS 13.3 | |
# CPU: Apple M1 CPU @ 2.40GHz | |
# Date: Thu Sep 14 10:19:13 CST 2023 | |
[info] # JMH version: 1.36 | |
[info] # VM version: JDK 11.0.17, OpenJDK 64-Bit Server VM, 11.0.17+8-LTS | |
[info] # VM options: -XX:+UseG1GC -Xms3g -Xmx3g -Xss2m -XX:+PerfDisableSharedMem -XX:MaxGCPauseMillis=300 -XX:MaxDirectMemorySize=256m -Djava.security.egd=file:/dev/./urandom | |
[info] Benchmark Mode Cnt Score Error Units | |
[info] JdbcAsyncJournalBenchmark.originalAsync thrpt 3 590394.058 ± 38433.920 ops/s | |
[info] JdbcAsyncJournalBenchmark.streamWithCollect thrpt 3 594978.499 ± 2034.776 ops/s | |
[info] Benchmark Mode Cnt Score Error Units | |
[info] JdbcAsyncJournalBenchmark.originalAsync thrpt 3 584438.653 ± 35769.421 ops/s | |
[info] JdbcAsyncJournalBenchmark.streamWithCollect thrpt 3 587042.327 ± 15553.554 ops/s | |
*/ | |
@State(Scope.Benchmark) | |
@OutputTimeUnit(TimeUnit.SECONDS) | |
@BenchmarkMode(Array(Mode.Throughput)) | |
class JdbcAsyncJournalBenchmark { | |
var system: ActorSystem = _ | |
var probe: TestProbe = _ | |
var mockReceiver: ActorRef = _ | |
var messagesWithBatchMock: Source[Try[Long], NotUsed] = _ | |
implicit var mat : Materializer = _ | |
val MAX_ELEMENT: Long = 10_000L | |
@Setup | |
def setup(): Unit = { | |
system = ActorSystem("test") | |
probe = TestProbe()(system) | |
val props = Props(new MockReceiver()) | |
mockReceiver = system.actorOf(props, "mockReceiver") | |
mat = SystemMaterializer(system).materializer | |
val initial: Long = 0L | |
val increment: Long => Option[(Long, Seq[Try[Long]])] = prev => { | |
Thread.sleep(5) // mock database response. | |
if (prev >= MAX_ELEMENT) None | |
else { | |
val next = prev + 1 | |
val elements = Iterator.range(next, next + 400L).map(e => Try(e)).toSeq | |
Some((next + 400, elements)) | |
} | |
} | |
messagesWithBatchMock = Source.unfold[Long, Seq[Try[Long]]](initial)(increment).mapConcat(identity) | |
} | |
@TearDown | |
def shutdown(): Unit = { | |
system.terminate() | |
Await.ready(system.whenTerminated, 15.seconds) | |
} | |
@Benchmark | |
@OperationsPerInvocation(100000) | |
def originalAsync(): Unit = { | |
messagesWithBatchMock | |
.take(Long.MaxValue) | |
.mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr)) | |
.runForeach(repr => mockReceiver ! repr) | |
.map(_ => ()) | |
probe.expectMsg(FiniteDuration(10, TimeUnit.SECONDS), MAX_ELEMENT) | |
} | |
@Benchmark | |
@OperationsPerInvocation(100000) | |
def streamWithCollect(): Unit = { | |
messagesWithBatchMock | |
.take(Long.MaxValue) | |
.collect { case Success(reprAndOrdNr) => reprAndOrdNr } | |
.runForeach(repr => mockReceiver ! repr) | |
.map(_ => ()) | |
probe.expectMsg(FiniteDuration(10, TimeUnit.SECONDS), MAX_ELEMENT) | |
} | |
class MockReceiver extends Actor { | |
def receive: Receive = { | |
case n: Long => | |
if (n == MAX_ELEMENT) { | |
probe.ref ! n | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment