Created
August 8, 2024 16:52
-
-
Save Ichoran/d971d086823964b0353eac307b69f006 to your computer and use it in GitHub Desktop.
Benchmarking Cats Effect 3 mutex vs. Java Loom with java.util.concurrent.Semaphore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2020-2024 Typelevel | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
* Modified by Rex Kerr (all rights for this test code ceded to Typelevel). | |
*/ | |
//> using scala 3.5.0-RC6 | |
//> using dep com.github.ichoran::kse3-flow:0.3.7 | |
//> using dep com.github.ichoran::kse3-maths:0.3.7 | |
//> using dep org.typelevel::cats-core:2.12.0 | |
//> using dep org.typelevel::cats-effect:3.5.4 | |
// Run with scala-cli --power --jmh --jvm=21 MutexBench.scala | |
// If you change the classes that have to be benchmarked, you may have to rm -r .scala_build | |
package cats.effect.benchmarks | |
import cats.effect.IO | |
import cats.effect.std._ | |
import cats.effect.unsafe.implicits.global | |
import org.openjdk.jmh.annotations._ | |
import org.openjdk.jmh.infra.Blackhole | |
import java.util.concurrent.TimeUnit | |
@State(Scope.Thread) | |
@BenchmarkMode(Array(Mode.Throughput)) | |
@OutputTimeUnit(TimeUnit.MICROSECONDS) | |
@Warmup(iterations = 10, time = 2000, timeUnit = TimeUnit.MILLISECONDS) | |
@Measurement(iterations = 10, time = 4000, timeUnit = TimeUnit.MILLISECONDS) | |
@OperationsPerInvocation(10000) | |
@Fork(2) | |
@Threads(1) | |
class MutexBenchmark: | |
import cats.syntax.all.* | |
import kse.basics.* | |
import kse.flow.* | |
@Param(Array("1", "5", "20", "100")) | |
var fibers: Int = -1 | |
//@Param(Array("10000")) | |
//var acquires: Int = -1 | |
val acquires = 10000 | |
private def mutexCompleteImpl(mutex: IO[Mutex[IO]], n: Atom.Count): IO[Unit] = | |
if fibers == 1 then | |
mutex.flatMap: m => | |
m.lock.use{ _ => IO(n.++) }.replicateA_(acquires) | |
else | |
mutex.flatMap: m => | |
m.lock.use{ _ => IO(n.++) }.parReplicateA_(fibers) | |
.replicateA_(acquires/fibers) | |
@Benchmark | |
def mutexComplete(): Unit = | |
val n = Atom.Count() | |
mutexCompleteImpl(mutex = Mutex.apply, n = n).unsafeRunSync() | |
if n() != acquires then | |
throw new Exception(s"Got ${n()} updates and expected $acquires") | |
private def mutexTransposeImpl(mutex: IO[Mutex[IO]], n: Atom.Count): IO[Unit] = | |
if fibers == 1 then | |
mutex.flatMap: m => | |
m.lock.use{ _ => IO(n.++) }.replicateA_(acquires) | |
else | |
mutex.flatMap: m => | |
m.lock.use{ _ => IO(n.++) }.replicateA_(acquires/fibers) | |
.parReplicateA_(fibers) | |
@Benchmark | |
def mutexTranspose(): Unit = | |
val n = Atom.Count() | |
mutexTransposeImpl(mutex = Mutex.apply, n = n).unsafeRunSync() | |
if n() != acquires then | |
throw new Exception(s"Got ${n()} updates and expected $acquires") | |
private def mutexKilledImpl(mutex: IO[Mutex[IO]], n: Atom.Count): IO[Unit] = | |
if fibers == 1 then | |
mutex | |
.flatMap: m => | |
m.lock.surround: | |
m.lock.use_.start.flatMap{ fiber => n.++; IO.cede >> fiber.cancel } | |
.replicateA_(acquires) | |
else | |
mutex | |
.flatMap: m => | |
m.lock.surround: | |
m.lock.use_.start.flatMap{ fiber => n.++; IO.cede >> fiber.cancel }.parReplicateA_(fibers) | |
.replicateA_(acquires/fibers) | |
@Benchmark | |
def mutexKilled(): Unit = | |
val n = Atom.Count() | |
mutexKilledImpl(mutex = Mutex.apply, n = n).unsafeRunSync() | |
if n() != acquires then | |
throw new Exception(s"Got ${n()} updates and expected $acquires") | |
private def atomicCellCompleteImpl(cell: IO[AtomicCell[IO, Int]]): IO[Unit] = | |
if fibers == 1 then | |
cell.flatMap: c => | |
c.evalUpdate(i => IO(i + 1)).replicateA_(acquires) | |
else | |
cell.flatMap: | |
c => c.evalUpdate(i => IO(i + 1)).parReplicateA_(fibers) | |
.replicateA_(acquires/fibers) | |
@Benchmark | |
def atomicCellComplete(): Unit = | |
atomicCellCompleteImpl(cell = AtomicCell.concurrent(0)).unsafeRunSync() | |
private def atomicCellTransposeImpl(cell: IO[AtomicCell[IO, Int]]): IO[Unit] = | |
if fibers == 1 then | |
cell.flatMap: c => | |
c.evalUpdate(i => IO(i + 1)).replicateA_(acquires) | |
else | |
cell.flatMap: | |
c => c.evalUpdate(i => IO(i + 1)).replicateA_(acquires/fibers) | |
.parReplicateA_(fibers) | |
@Benchmark | |
def atomicCellTranspose(): Unit = | |
atomicCellCompleteImpl(cell = AtomicCell.concurrent(0)).unsafeRunSync() | |
extension (s: java.util.concurrent.Semaphore) | |
inline def held[A](inline f: => A): A = | |
try | |
s.acquire() | |
f | |
finally | |
s.release() | |
@Benchmark | |
def kseSemaphoreComplete(bh: Blackhole): Unit = | |
val m = new java.util.concurrent.Semaphore(1, true) | |
val n = Atom.Count() | |
(acquires/fibers).times: | |
val fs = fibers.arrayed: i => | |
Fu: | |
m.held: | |
n.++ | |
fs.map(_.ask()) | |
if n() != acquires then | |
throw new Exception(s"Got ${n()} updates and expected $acquires") | |
@Benchmark | |
def kseSemaphoreTranspose(bh: Blackhole): Unit = | |
val m = new java.util.concurrent.Semaphore(1, true) | |
val n = Atom.Count() | |
val fs = fibers.arrayed: i => | |
Fu: | |
(acquires/fibers).times: | |
m.held: | |
n.++ | |
fs.map(_.ask()) | |
if n() != acquires then | |
throw new Exception(s"Got ${n()} updates and expected $acquires") | |
@Benchmark | |
def kseSemaphoreKilled(bh: Blackhole): Unit = | |
val m = new java.util.concurrent.Semaphore(1, true) | |
val n = Atom.Count() | |
(acquires/fibers).times: | |
val fs = fibers.arrayed: i => | |
Fu: | |
m.held: | |
n.++ | |
Thread.currentThread.interrupt() | |
fs.map(_.ask()) | |
if n() != acquires then | |
throw new Exception(s"Got ${n()} updates and expected $acquires") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GOAL: create `fibers` tasks and run in parallel to get mutex, add 1, quit. | |
Repeat until 10,000 additions have been performed | |
VM OpenJDK.21 GraalCE.21 | |
Benchmark (fibers) Score Error Score Error Units Win | |
mutexComplete 1 3.414 ± 0.241 4.055 ± 0.082 ops/us << | |
atomicCellComplete 1 3.049 ± 0.074 3.598 ± 0.159 ops/us | |
kseSemaphoreComplete 1 0.432 ± 0.017 0.461 ± 0.014 ops/us | |
mutexComplete 5 0.108 ± 0.016 0.147 ± 0.021 ops/us | |
atomicCellComplete 5 0.090 ± 0.012 0.133 ± 0.023 ops/us | |
kseSemaphoreComplete 5 1.301 ± 0.033 1.507 ± 0.018 ops/us << | |
mutexComplete 20 0.180 ± 0.068 0.142 ± 0.018 ops/us | |
atomicCellComplete 20 0.096 ± 0.016 0.123 ± 0.011 ops/us | |
kseSemaphoreComplete 20 1.986 ± 0.021 2.240 ± 0.015 ops/us << | |
mutexComplete 100 0.113 ± 0.013 0.144 ± 0.010 ops/us | |
atomicCellComplete 100 0.096 ± 0.010 0.129 ± 0.014 ops/us | |
kseSemaphoreComplete 100 1.380 ± 0.023 1.288 ± 0.109 ops/us << | |
GOAL: create `fibers` tasks that run in parallel to get mutex, add 1, repeat. | |
The parallel operations each quit after 10,000/fibers repeats | |
VM OpenJDK.21 GraalCE.21 | |
Benchmark (fibers) Score Error Score Error Units Win | |
mutexTranspose 1 3.641 ± 0.118 4.029 ± 0.043 ops/us | |
atomicCellTranspose 1 2.790 ± 0.175 3.759 ± 0.086 ops/us | |
kseSemaphoreTranspose 1 73.059 ± 1.532 76.123 ± 0.456 ops/us << | |
mutexTranspose 5 5.061 ± 0.282 5.126 ± 0.233 ops/us << | |
atomicCellTranspose 5 0.124 ± 0.045 0.122 ± 0.014 ops/us | |
kseSemaphoreTranspose 5 2.023 ± 0.027 2.501 ± 0.036 ops/us | |
mutexTranspose 20 4.801 ± 0.485 5.305 ± 0.966 ops/us << | |
atomicCellTranspose 20 0.204 ± 0.053 0.124 ± 0.015 ops/us | |
kseSemaphoreTranspose 20 1.735 ± 0.080 2.162 ± 0.083 ops/us | |
mutexTranspose 100 8.294 ± 0.647 7.011 ± 0.376 ops/us << | |
atomicCellTranspose 100 0.169 ± 0.050 0.134 ± 0.017 ops/us | |
kseSemaphoreTranspose 100 0.998 ± 0.069 0.440 ± 0.029 ops/us | |
GOAL: create `fibers` tasks that run in parallel to get mutex, add 1, | |
then cancel self! Repeat until 10,000 additions have been performed. | |
VM OpenJDK.21 GraalCE.21 | |
Benchmark (fibers) Score Error Score Error Units Win | |
mutexKilled 1 0.114 ± 0.024 0.114 ± 0.012 ops/us | |
kseSemaphoreKilled 1 0.433 ± 0.011 0.457 ± 0.009 ops/us << | |
mutexKilled 5 0.108 ± 0.039 0.077 ± 0.019 ops/us | |
kseSemaphoreKilled 5 1.179 ± 0.049 1.373 ± 0.018 ops/us << | |
mutexKilled 20 0.138 ± 0.059 0.112 ± 0.014 ops/us | |
kseSemaphoreKilled 20 1.211 ± 0.025 1.312 ± 0.011 ops/us << | |
mutexKilled 100 0.497 ± 0.271 0.191 ± 0.044 ops/us | |
kseSemaphoreKilled 100 0.871 ± 0.009 0.847 ± 0.124 ops/us << |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment