|
package sharetest |
|
|
|
import rx.lang.scala.Observable |
|
import rx.lang.scala.subjects.PublishSubject |
|
|
|
class SourceSim { |
|
private val subject = PublishSubject[Double]() |
|
|
|
private val random = scala.util.Random |
|
|
|
def runSimulation(reps: Long, logEvery: Option[Long]): Unit = { |
|
// in reality, this would be data streamed from disk, hundreds of millions |
|
// of rows |
|
for (i <- 0L to reps) { |
|
subject.onNext(random.nextGaussian()) |
|
logEvery.foreach(every => if (i > 0 && (i % every) == 0) println(s"Processed ${i/1000000.0}M records ")) |
|
} |
|
} |
|
|
|
def obs: Observable[Double] = subject |
|
} |
|
|
|
object Main { |
|
|
|
def maybeShare[T](share: Boolean)(in: Observable[T]): Observable[T] = |
|
if (share) in.share else in |
|
|
|
def runSimple(share: Boolean): Unit = { |
|
val sim = new SourceSim() |
|
val obs = maybeShare(share)(sim.obs) |
|
|
|
obs.subscribe(_ => ()) |
|
val start = System.currentTimeMillis() |
|
val reps = 100000000L |
|
sim.runSimulation(reps, Some(reps)) |
|
val end = System.currentTimeMillis() |
|
val elapsed = (end - start) / 1000.0 |
|
println(s"Shared: $share, elapsed time: $elapsed") |
|
} |
|
|
|
def runPathological(share: Boolean): Unit = { |
|
val sim = new SourceSim() |
|
val obs = sim.obs |
|
|
|
def addCalcLayer(in: Observable[Double]): Observable[Double] = { |
|
// simulate some computation that takes non-trivial amount of time to calculate |
|
val delayed = in.map{ x => |
|
var q: Long = 0 |
|
val howMany = scala.util.Random.nextLong() % 1 + 10 |
|
for (i <- 0L to howMany) q = q ^ i |
|
x + q.toDouble / 10000000000.0 |
|
} |
|
// maybe share, then add three subscribers |
|
val out = maybeShare(share)(delayed) |
|
out.subscribe(_ => ()) |
|
out.subscribe(_ => ()) |
|
out.subscribe(_ => ()) |
|
out |
|
} |
|
val layer1 = addCalcLayer(obs) |
|
val layer2 = addCalcLayer(layer1) |
|
val layer3 = addCalcLayer(layer2) |
|
val layer4 = addCalcLayer(layer3) |
|
|
|
val start = System.currentTimeMillis() |
|
val reps = 1000000L |
|
sim.runSimulation(reps, Some(reps/10)) |
|
val end = System.currentTimeMillis() |
|
val elapsed = (end - start) / 1000.0 |
|
println(s"Shared: $share, elapsed time: $elapsed") |
|
} |
|
|
|
sealed trait Variety |
|
case class Diff(x: Double) extends Variety |
|
case class Pows(x: (Double, Double, Double, Double)) extends Variety |
|
|
|
def runRealistic(share: Boolean): Unit = { |
|
val sim = new SourceSim() |
|
val obs = maybeShare(share)(sim.obs) |
|
|
|
val pos = obs.filter(_ > 0) |
|
val absNeg = obs.filter(_ < 0).map(-_) |
|
val posSum = pos.scan(0.0)(_ + _) |
|
val absNegSum = absNeg.scan(0.0)(_ + _) |
|
val diff = Observable.combineLatest(List(posSum, absNegSum))(sums => sums.reduce(_ - _)) |
|
val sdiff = maybeShare(share)(diff) |
|
// one guy wants the diff as it is |
|
sdiff.subscribe(_ => ()) |
|
// one guy wants the powers |
|
def pows(x: Double) = { |
|
val sq = x*x |
|
val cu = sq*x |
|
val qu = cu*x |
|
(x, sq, cu, qu) |
|
} |
|
val diffPows = sdiff.map(pows) |
|
val sdiffPows = maybeShare(share)(diffPows) |
|
sdiffPows.subscribe(_ => ()) |
|
// someone else wants a merged stream |
|
val wrappedDiff = sdiff.map(Diff) |
|
val wrappedPows = sdiffPows.map(Pows) |
|
val merged = wrappedDiff.merge(wrappedPows) |
|
val smerged = maybeShare(share)(merged) |
|
smerged.subscribe(_ => ()) |
|
|
|
val start = System.currentTimeMillis() |
|
val reps = 10000000L |
|
sim.runSimulation(reps, Some(reps)) |
|
val end = System.currentTimeMillis() |
|
val elapsed = (end - start) / 1000.0 |
|
println(s"Shared: $share, elapsed time: $elapsed") |
|
} |
|
|
|
def main(args: Array[String]): Unit = { |
|
println("Straight through") |
|
runSimple(false) |
|
runSimple(true) |
|
println("Pathological in favor of share") |
|
runPathological(false) |
|
runPathological(true) |
|
println("Realistic") |
|
runRealistic(false) |
|
runRealistic(true) |
|
|
|
} |
|
} |