Skip to content

Instantly share code, notes, and snippets.

@ktoso
Created June 18, 2015 13:09
Show Gist options
  • Save ktoso/68d47c117c547343c58b to your computer and use it in GitHub Desktop.
Save ktoso/68d47c117c547343c58b to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import akka.routing.{NoRoutee, Routee}
import org.openjdk.jmh.annotations._
import scala.collection.immutable
/*
[info] Benchmark (size) Mode Cnt Score Error Units
[info] NumsBenchmark.atomic_int 8 thrpt 5 1045670.309 ± 213826.258 ops/ms
[info] NumsBenchmark.atomic_int 512 thrpt 5 936539.704 ± 111615.735 ops/ms
[info] NumsBenchmark.atomic_int_2 8 thrpt 5 1019928.244 ± 50521.112 ops/ms
[info] NumsBenchmark.atomic_int_2 512 thrpt 5 917039.443 ± 411388.925 ops/ms
[info] NumsBenchmark.atomic_long 8 thrpt 5 982890.076 ± 49104.190 ops/ms
[info] NumsBenchmark.atomic_long 512 thrpt 5 935921.355 ± 135357.745 ops/ms
[info] NumsBenchmark.atomic_long_2 8 thrpt 5 828560.457 ± 400167.702 ops/ms
[info] NumsBenchmark.atomic_long_2 512 thrpt 5 657202.422 ± 178080.084 ops/ms
[info] NumsBenchmark.cas_int 8 thrpt 5 788196.970 ± 272267.693 ops/ms
[info] NumsBenchmark.cas_int 512 thrpt 5 827189.532 ± 578062.957 ops/ms
[info] NumsBenchmark.cas_int2 8 thrpt 5 856313.969 ± 146156.963 ops/ms
[info] NumsBenchmark.cas_int2 512 thrpt 5 869719.344 ± 272835.734 ops/ms
*/
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Threads(10)
@Fork(1)
class NumsBenchmark {
private final val nextInt = new AtomicInteger()
private final val nextLong = new AtomicInteger()
@Param(Array("8", "512"))
final val size = 0
private val noop = new Routee {
override def send(message: Any, sender: ActorRef): Unit = ()
}
final val routees = Vector.fill(size)(noop)
@Benchmark
def atomic_int =
selectInt("", routees)
@Benchmark
def atomic_int_2 =
selectInt2("", routees)
@Benchmark
def atomic_long =
selectLong("", routees)
@Benchmark
def atomic_long_2 =
selectLong2("", routees)
@Benchmark
def cas_int =
selectCas("", routees)
@Benchmark
def cas_int2 =
selectCas2("", routees)
def selectInt(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.nonEmpty) {
val nextSeq = nextInt.getAndIncrement
// If nextSeq is Integer.MAX_VALUE + 1 then nextSeq is now Integer.MIN_VALUE resulting in a negative modulus operation.
// Math.abs would at inverted the order breaking the direction of the sequence -backward vs forward-
routees(if (nextSeq < 0) size + (nextSeq % size) - 1 else nextSeq % size)
} else NoRoutee
def selectInt2(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else {
val nextSeq = nextInt.getAndIncrement
// If nextSeq is Integer.MAX_VALUE + 1 then nextSeq is now Integer.MIN_VALUE resulting in a negative modulus operation.
// Math.abs would at inverted the order breaking the direction of the sequence -backward vs forward-
routees(if (nextSeq < 0) size + (nextSeq % size) - 1 else nextSeq % size)
}
def selectLong(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.nonEmpty) {
val nextSeq = nextLong.getAndIncrement
routees(if (nextSeq < 0) size + (nextSeq % size) - 1 else nextSeq % size)
} else NoRoutee
def selectLong2(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else {
val nextSeq = nextLong.getAndIncrement
routees(if (nextSeq < 0) size + (nextSeq % size) - 1 else nextSeq % size)
}
def selectCas(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.nonEmpty) {
var currVal = 0
var nextVal = 0
val size = routees.size
do {
currVal = nextInt.getAndIncrement
nextVal = currVal + 1
} while (!nextInt.compareAndSet(currVal, if (nextVal < size) nextVal else 0))
routees(currVal)
} else NoRoutee
def selectCas2(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.nonEmpty) {
var currVal = 0
var nextVal = 0
val size = routees.size
do {
currVal = nextInt.get
nextVal = currVal + 1
} while (!nextInt.compareAndSet(currVal, if (nextVal < size) nextVal else 0))
routees(currVal)
} else NoRoutee
}
@guidomedina
Copy link

I think there are only 3 options after all:

  • atomic_int with avg of 1m ops/ms -selectInt(...) method-
  • atomic_long with avg of 960K ops/ms -selectLong(...) method-
  • cas_int with avg of 860K ops/ms, with the following fix:

Select CAS is broken; if a routee is removed an index out of bound exception will be thrown, fix:

override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.nonEmpty) {
      var currVal = 0
      var nextVal = 0
      val size = routees.size
      do {
        currVal = next.get
        nextVal = currVal + 1
        // if currVal >= size it will do an extra iteration.
      } while (!next.compareAndSet(currVal, if (nextVal < size) nextVal else 0) || currVal >= size)
      routees(currVal)
    } else NoRoutee

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment