Skip to content

Instantly share code, notes, and snippets.

@experquisite
Last active August 29, 2015 14:16
Show Gist options
  • Save experquisite/43f0a30af92a8edbffe8 to your computer and use it in GitHub Desktop.
Save experquisite/43f0a30af92a8edbffe8 to your computer and use it in GitHub Desktop.
Example of rxscala .share() overhead
package sharetest
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject
class SourceSim {
private val subject = PublishSubject[Double]()
private val random = scala.util.Random
def runSimulation(reps: Long, logEvery: Option[Long]): Unit = {
// in reality, this would be data streamed from disk, hundreds of millions
// of rows
for (i <- 0L to reps) {
subject.onNext(random.nextGaussian())
logEvery.foreach(every => if (i > 0 && (i % every) == 0) println(s"Processed ${i/1000000.0}M records "))
}
}
def obs: Observable[Double] = subject
}
object Main {
def maybeShare[T](share: Boolean)(in: Observable[T]): Observable[T] =
if (share) in.share else in
def runSimple(share: Boolean): Unit = {
val sim = new SourceSim()
val obs = maybeShare(share)(sim.obs)
obs.subscribe(_ => ())
val start = System.currentTimeMillis()
val reps = 100000000L
sim.runSimulation(reps, Some(reps))
val end = System.currentTimeMillis()
val elapsed = (end - start) / 1000.0
println(s"Shared: $share, elapsed time: $elapsed")
}
def runPathological(share: Boolean): Unit = {
val sim = new SourceSim()
val obs = sim.obs
def addCalcLayer(in: Observable[Double]): Observable[Double] = {
// simulate some computation that takes non-trivial amount of time to calculate
val delayed = in.map{ x =>
var q: Long = 0
val howMany = scala.util.Random.nextLong() % 1 + 10
for (i <- 0L to howMany) q = q ^ i
x + q.toDouble / 10000000000.0
}
// maybe share, then add three subscribers
val out = maybeShare(share)(delayed)
out.subscribe(_ => ())
out.subscribe(_ => ())
out.subscribe(_ => ())
out
}
val layer1 = addCalcLayer(obs)
val layer2 = addCalcLayer(layer1)
val layer3 = addCalcLayer(layer2)
val layer4 = addCalcLayer(layer3)
val start = System.currentTimeMillis()
val reps = 1000000L
sim.runSimulation(reps, Some(reps/10))
val end = System.currentTimeMillis()
val elapsed = (end - start) / 1000.0
println(s"Shared: $share, elapsed time: $elapsed")
}
sealed trait Variety
case class Diff(x: Double) extends Variety
case class Pows(x: (Double, Double, Double, Double)) extends Variety
def runRealistic(share: Boolean): Unit = {
val sim = new SourceSim()
val obs = maybeShare(share)(sim.obs)
val pos = obs.filter(_ > 0)
val absNeg = obs.filter(_ < 0).map(-_)
val posSum = pos.scan(0.0)(_ + _)
val absNegSum = absNeg.scan(0.0)(_ + _)
val diff = Observable.combineLatest(List(posSum, absNegSum))(sums => sums.reduce(_ - _))
val sdiff = maybeShare(share)(diff)
// one guy wants the diff as it is
sdiff.subscribe(_ => ())
// one guy wants the powers
def pows(x: Double) = {
val sq = x*x
val cu = sq*x
val qu = cu*x
(x, sq, cu, qu)
}
val diffPows = sdiff.map(pows)
val sdiffPows = maybeShare(share)(diffPows)
sdiffPows.subscribe(_ => ())
// someone else wants a merged stream
val wrappedDiff = sdiff.map(Diff)
val wrappedPows = sdiffPows.map(Pows)
val merged = wrappedDiff.merge(wrappedPows)
val smerged = maybeShare(share)(merged)
smerged.subscribe(_ => ())
val start = System.currentTimeMillis()
val reps = 10000000L
sim.runSimulation(reps, Some(reps))
val end = System.currentTimeMillis()
val elapsed = (end - start) / 1000.0
println(s"Shared: $share, elapsed time: $elapsed")
}
def main(args: Array[String]): Unit = {
println("Straight through")
runSimple(false)
runSimple(true)
println("Pathological in favor of share")
runPathological(false)
runPathological(true)
println("Realistic")
runRealistic(false)
runRealistic(true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment